This is an automated email from the ASF dual-hosted git repository.
freemandealer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7ec8f7d0d61 [fix](filecache) exclude warmup reads from file cache hit
ratio metrics (#63394)
7ec8f7d0d61 is described below
commit 7ec8f7d0d61d1ab47353ce095ac5b879a28fb15e
Author: zhengyu <[email protected]>
AuthorDate: Tue Jun 2 18:03:22 2026 +0800
[fix](filecache) exclude warmup reads from file cache hit ratio metrics
(#63394)
File cache hit ratio metrics are derived from global
file cache read bytes, but warmup reads from manual warmup, periodic
warmup, event-driven warmup, and rebalance-triggered warmup used to
update the same counters as query reads. This polluted the query hit
ratio. Mixed hit/miss reads could also be attributed to one source for
the whole request. This change skips warmup updates to global file cache
read metrics while preserving per-IOContext profile stats, records
local/remote/peer bytes by actual returned bytes, and avoids updating
metrics for failed reads. It also fixes direct-read partial continuation
and no-warmup miss-only hit ratio refresh.
---
be/src/io/cache/block_file_cache.cpp | 6 +-
be/src/io/cache/block_file_cache_profile.cpp | 37 ++
be/src/io/cache/block_file_cache_profile.h | 7 +-
be/src/io/cache/cached_remote_file_reader.cpp | 82 ++--
be/src/io/cache/file_cache_common.h | 3 +
be/src/storage/storage_engine.cpp | 1 +
.../block_file_cache_profile_reporter_test.cpp | 139 +++++++
be/test/io/cache/block_file_cache_test.cpp | 365 ++++++++++++++--
...st_file_cache_warmup_read_metrics_docker.groovy | 457 +++++++++++++++++++++
9 files changed, 1039 insertions(+), 58 deletions(-)
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index 9f1fc29f1f1..dfbad8a63ed 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -2053,16 +2053,16 @@ void BlockFileCache::run_background_monitor() {
(double)_num_read_blocks_1h->get_value());
}
- if (_no_warmup_num_hit_blocks->get_value() > 0) {
+ if (_no_warmup_num_read_blocks->get_value() > 0) {
_no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
(double)_no_warmup_num_read_blocks->get_value());
}
- if (_no_warmup_num_hit_blocks_5m &&
_no_warmup_num_hit_blocks_5m->get_value() > 0) {
+ if (_no_warmup_num_read_blocks_5m &&
_no_warmup_num_read_blocks_5m->get_value() > 0) {
_no_warmup_hit_ratio_5m->set_value(
(double)_no_warmup_num_hit_blocks_5m->get_value() /
(double)_no_warmup_num_read_blocks_5m->get_value());
}
- if (_no_warmup_num_hit_blocks_1h &&
_no_warmup_num_hit_blocks_1h->get_value() > 0) {
+ if (_no_warmup_num_read_blocks_1h &&
_no_warmup_num_read_blocks_1h->get_value() > 0) {
_no_warmup_hit_ratio_1h->set_value(
(double)_no_warmup_num_hit_blocks_1h->get_value() /
(double)_no_warmup_num_read_blocks_1h->get_value());
diff --git a/be/src/io/cache/block_file_cache_profile.cpp
b/be/src/io/cache/block_file_cache_profile.cpp
index 692174dbbcb..8f9c167c998 100644
--- a/be/src/io/cache/block_file_cache_profile.cpp
+++ b/be/src/io/cache/block_file_cache_profile.cpp
@@ -65,6 +65,43 @@ void FileCacheMetrics::update_metrics_callback() {
stats->num_io_bytes_read_from_peer);
}
+FileCacheStatistics diff_file_cache_statistics(const FileCacheStatistics&
current,
+ const FileCacheStatistics&
previous) {
+ FileCacheStatistics diff;
+#define SUBTRACT_FIELD(field) diff.field = current.field - previous.field
+ SUBTRACT_FIELD(num_local_io_total);
+ SUBTRACT_FIELD(num_remote_io_total);
+ SUBTRACT_FIELD(num_peer_io_total);
+ SUBTRACT_FIELD(local_io_timer);
+ SUBTRACT_FIELD(bytes_read_from_local);
+ SUBTRACT_FIELD(bytes_read_from_remote);
+ SUBTRACT_FIELD(bytes_read_from_peer);
+ SUBTRACT_FIELD(remote_io_timer);
+ SUBTRACT_FIELD(peer_io_timer);
+ SUBTRACT_FIELD(remote_wait_timer);
+ SUBTRACT_FIELD(write_cache_io_timer);
+ SUBTRACT_FIELD(bytes_write_into_cache);
+ SUBTRACT_FIELD(num_skip_cache_io_total);
+ SUBTRACT_FIELD(read_cache_file_directly_timer);
+ SUBTRACT_FIELD(cache_get_or_set_timer);
+ SUBTRACT_FIELD(lock_wait_timer);
+ SUBTRACT_FIELD(get_timer);
+ SUBTRACT_FIELD(set_timer);
+
+ SUBTRACT_FIELD(inverted_index_num_local_io_total);
+ SUBTRACT_FIELD(inverted_index_num_remote_io_total);
+ SUBTRACT_FIELD(inverted_index_num_peer_io_total);
+ SUBTRACT_FIELD(inverted_index_bytes_read_from_local);
+ SUBTRACT_FIELD(inverted_index_bytes_read_from_remote);
+ SUBTRACT_FIELD(inverted_index_bytes_read_from_peer);
+ SUBTRACT_FIELD(inverted_index_local_io_timer);
+ SUBTRACT_FIELD(inverted_index_remote_io_timer);
+ SUBTRACT_FIELD(inverted_index_peer_io_timer);
+ SUBTRACT_FIELD(inverted_index_io_timer);
+#undef SUBTRACT_FIELD
+ return diff;
+}
+
FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) {
static const char* cache_profile = "FileCache";
ADD_TIMER_WITH_LEVEL(profile, cache_profile, 2);
diff --git a/be/src/io/cache/block_file_cache_profile.h
b/be/src/io/cache/block_file_cache_profile.h
index 5fdb82fbd61..6c95e49791c 100644
--- a/be/src/io/cache/block_file_cache_profile.h
+++ b/be/src/io/cache/block_file_cache_profile.h
@@ -52,9 +52,9 @@ public:
}
void update(FileCacheStatistics* stats);
+ std::shared_ptr<AtomicStatistics> report();
private:
- std::shared_ptr<AtomicStatistics> report();
void register_entity();
void update_metrics_callback();
@@ -64,6 +64,9 @@ private:
std::shared_ptr<AtomicStatistics> _statistics;
};
+FileCacheStatistics diff_file_cache_statistics(const FileCacheStatistics&
current,
+ const FileCacheStatistics&
previous);
+
struct FileCacheProfileReporter {
RuntimeProfile::Counter* num_local_io_total = nullptr;
RuntimeProfile::Counter* num_remote_io_total = nullptr;
@@ -100,4 +103,4 @@ struct FileCacheProfileReporter {
};
} // namespace io
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index c7f6ddffc5f..7c297737d60 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -297,6 +297,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
ReadStatistics stats;
stats.bytes_read += bytes_req;
+ bool read_success = false;
MonotonicStopWatch read_at_sw;
read_at_sw.start();
auto defer_func = [&](int*) {
@@ -315,10 +316,15 @@ Status CachedRemoteFileReader::read_at_impl(size_t
offset, Slice result, size_t*
if (is_dryrun) {
return;
}
- // update stats increment in this reading procedure for file cache
metrics
- FileCacheStatistics fcache_stats_increment;
- _update_stats(stats, &fcache_stats_increment,
io_ctx->is_inverted_index);
- io::FileCacheMetrics::instance().update(&fcache_stats_increment);
+ if (!read_success) {
+ return;
+ }
+ if (!io_ctx->is_warmup) {
+ // update stats increment in this reading procedure for file cache
metrics
+ FileCacheStatistics fcache_stats_increment;
+ _update_stats(stats, &fcache_stats_increment,
io_ctx->is_inverted_index);
+ io::FileCacheMetrics::instance().update(&fcache_stats_increment);
+ }
if (io_ctx->file_cache_stats) {
// update stats in io_ctx, for query profile
_update_stats(stats, io_ctx->file_cache_stats,
io_ctx->is_inverted_index);
@@ -355,6 +361,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
.ok()) { //TODO: maybe read failed because
block evict, should handle error
break;
}
+ stats.bytes_read_from_local += reserve_bytes;
}
_cache->add_need_update_lru_block(iter->second);
need_read_size -= reserve_bytes;
@@ -367,6 +374,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
stats.hit_cache = true;
g_read_cache_direct_whole_num << 1;
g_read_cache_direct_whole_bytes << bytes_req;
+ read_success = true;
return Status::OK();
} else {
g_read_cache_direct_partial_num << 1;
@@ -442,6 +450,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
// Determine read type and execute remote read
RETURN_IF_ERROR(
_execute_remote_read(empty_blocks, empty_start, size, buffer,
stats, io_ctx));
+ bool empty_blocks_from_peer_cache = stats.from_peer_cache;
{
SCOPED_CONCURRENCY_COUNT(
@@ -476,6 +485,11 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
size_t copy_size = copy_right_offset - copy_left_offset + 1;
memcpy(dst, src, copy_size);
indirect_read_bytes += copy_size;
+ if (empty_blocks_from_peer_cache) {
+ stats.bytes_read_from_peer += copy_size;
+ } else {
+ stats.bytes_read_from_remote += copy_size;
+ }
}
}
@@ -536,7 +550,10 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
ConcurrencyStatsManager::instance().cached_remote_reader_local_read);
st = block->read(Slice(result.data + (current_offset -
offset), read_size),
file_offset);
- indirect_read_bytes += read_size;
+ if (st.ok()) {
+ indirect_read_bytes += read_size;
+ stats.bytes_read_from_local += read_size;
+ }
}
}
if (!st || block_state != FileBlock::State::DOWNLOADED) {
@@ -567,6 +584,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
&nest_bytes_read));
indirect_read_bytes += read_size;
DCHECK(nest_bytes_read == read_size);
+ stats.bytes_read_from_remote += nest_bytes_read;
}
}
}
@@ -580,6 +598,12 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
g_read_cache_indirect_total_bytes << *bytes_read;
DCHECK(*bytes_read == bytes_req);
+ if (!is_dryrun) {
+ DCHECK_EQ(stats.bytes_read_from_local + stats.bytes_read_from_remote +
+ stats.bytes_read_from_peer,
+ bytes_req);
+ }
+ read_success = true;
return Status::OK();
}
@@ -589,19 +613,19 @@ void CachedRemoteFileReader::_update_stats(const
ReadStatistics& read_stats,
if (statis == nullptr) {
return;
}
- if (read_stats.hit_cache) {
+ if (read_stats.bytes_read_from_local > 0) {
statis->num_local_io_total++;
- statis->bytes_read_from_local += read_stats.bytes_read;
- } else {
- if (read_stats.from_peer_cache) {
- statis->num_peer_io_total++;
- statis->bytes_read_from_peer += read_stats.bytes_read;
- statis->peer_io_timer += read_stats.peer_read_timer;
- } else {
- statis->num_remote_io_total++;
- statis->bytes_read_from_remote += read_stats.bytes_read;
- statis->remote_io_timer += read_stats.remote_read_timer;
- }
+ statis->bytes_read_from_local += read_stats.bytes_read_from_local;
+ }
+ if (read_stats.bytes_read_from_remote > 0) {
+ statis->num_remote_io_total++;
+ statis->bytes_read_from_remote += read_stats.bytes_read_from_remote;
+ statis->remote_io_timer += read_stats.remote_read_timer;
+ }
+ if (read_stats.bytes_read_from_peer > 0) {
+ statis->num_peer_io_total++;
+ statis->bytes_read_from_peer += read_stats.bytes_read_from_peer;
+ statis->peer_io_timer += read_stats.peer_read_timer;
}
statis->remote_wait_timer += read_stats.remote_wait_timer;
statis->local_io_timer += read_stats.local_read_timer;
@@ -616,19 +640,19 @@ void CachedRemoteFileReader::_update_stats(const
ReadStatistics& read_stats,
statis->set_timer += read_stats.set_timer;
if (is_inverted_index) {
- if (read_stats.hit_cache) {
+ if (read_stats.bytes_read_from_local > 0) {
statis->inverted_index_num_local_io_total++;
- statis->inverted_index_bytes_read_from_local +=
read_stats.bytes_read;
- } else {
- if (read_stats.from_peer_cache) {
- statis->inverted_index_num_peer_io_total++;
- statis->inverted_index_bytes_read_from_peer +=
read_stats.bytes_read;
- statis->inverted_index_peer_io_timer +=
read_stats.peer_read_timer;
- } else {
- statis->inverted_index_num_remote_io_total++;
- statis->inverted_index_bytes_read_from_remote +=
read_stats.bytes_read;
- statis->inverted_index_remote_io_timer +=
read_stats.remote_read_timer;
- }
+ statis->inverted_index_bytes_read_from_local +=
read_stats.bytes_read_from_local;
+ }
+ if (read_stats.bytes_read_from_remote > 0) {
+ statis->inverted_index_num_remote_io_total++;
+ statis->inverted_index_bytes_read_from_remote +=
read_stats.bytes_read_from_remote;
+ statis->inverted_index_remote_io_timer +=
read_stats.remote_read_timer;
+ }
+ if (read_stats.bytes_read_from_peer > 0) {
+ statis->inverted_index_num_peer_io_total++;
+ statis->inverted_index_bytes_read_from_peer +=
read_stats.bytes_read_from_peer;
+ statis->inverted_index_peer_io_timer += read_stats.peer_read_timer;
}
statis->inverted_index_local_io_timer += read_stats.local_read_timer;
}
diff --git a/be/src/io/cache/file_cache_common.h
b/be/src/io/cache/file_cache_common.h
index 43510293af8..8fde90abc0a 100644
--- a/be/src/io/cache/file_cache_common.h
+++ b/be/src/io/cache/file_cache_common.h
@@ -71,6 +71,9 @@ struct ReadStatistics {
bool from_peer_cache = false;
bool skip_cache = false;
int64_t bytes_read = 0;
+ int64_t bytes_read_from_local = 0;
+ int64_t bytes_read_from_remote = 0;
+ int64_t bytes_read_from_peer = 0;
int64_t bytes_write_into_file_cache = 0;
int64_t remote_read_timer = 0;
int64_t peer_read_timer = 0;
diff --git a/be/src/storage/storage_engine.cpp
b/be/src/storage/storage_engine.cpp
index 92f919f9831..9f5b545871c 100644
--- a/be/src/storage/storage_engine.cpp
+++ b/be/src/storage/storage_engine.cpp
@@ -257,6 +257,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
}
StorageEngine::~StorageEngine() {
+ DEREGISTER_HOOK_METRIC(unused_rowsets_count);
stop();
}
diff --git a/be/test/io/cache/block_file_cache_profile_reporter_test.cpp
b/be/test/io/cache/block_file_cache_profile_reporter_test.cpp
new file mode 100644
index 00000000000..e74ad758ac1
--- /dev/null
+++ b/be/test/io/cache/block_file_cache_profile_reporter_test.cpp
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "io/cache/block_file_cache_profile.h"
+
+namespace doris {
+namespace {
+
+io::FileCacheStatistics make_file_cache_stats(int64_t multiplier) {
+ io::FileCacheStatistics stats;
+ stats.num_local_io_total = multiplier;
+ stats.num_remote_io_total = multiplier * 2;
+ stats.num_peer_io_total = multiplier * 3;
+ stats.local_io_timer = multiplier * 4;
+ stats.bytes_read_from_local = multiplier * 5;
+ stats.bytes_read_from_remote = multiplier * 6;
+ stats.bytes_read_from_peer = multiplier * 7;
+ stats.remote_io_timer = multiplier * 8;
+ stats.peer_io_timer = multiplier * 9;
+ stats.remote_wait_timer = multiplier * 10;
+ stats.write_cache_io_timer = multiplier * 11;
+ stats.bytes_write_into_cache = multiplier * 12;
+ stats.num_skip_cache_io_total = multiplier * 13;
+ stats.read_cache_file_directly_timer = multiplier * 14;
+ stats.cache_get_or_set_timer = multiplier * 15;
+ stats.lock_wait_timer = multiplier * 16;
+ stats.get_timer = multiplier * 17;
+ stats.set_timer = multiplier * 18;
+ stats.inverted_index_num_local_io_total = multiplier * 19;
+ stats.inverted_index_num_remote_io_total = multiplier * 20;
+ stats.inverted_index_num_peer_io_total = multiplier * 21;
+ stats.inverted_index_bytes_read_from_local = multiplier * 22;
+ stats.inverted_index_bytes_read_from_remote = multiplier * 23;
+ stats.inverted_index_bytes_read_from_peer = multiplier * 24;
+ stats.inverted_index_local_io_timer = multiplier * 25;
+ stats.inverted_index_remote_io_timer = multiplier * 26;
+ stats.inverted_index_peer_io_timer = multiplier * 27;
+ stats.inverted_index_io_timer = multiplier * 28;
+ return stats;
+}
+
+void expect_file_cache_stats_eq(const io::FileCacheStatistics& actual,
+ const io::FileCacheStatistics& expected) {
+ EXPECT_EQ(actual.num_local_io_total, expected.num_local_io_total);
+ EXPECT_EQ(actual.num_remote_io_total, expected.num_remote_io_total);
+ EXPECT_EQ(actual.num_peer_io_total, expected.num_peer_io_total);
+ EXPECT_EQ(actual.local_io_timer, expected.local_io_timer);
+ EXPECT_EQ(actual.bytes_read_from_local, expected.bytes_read_from_local);
+ EXPECT_EQ(actual.bytes_read_from_remote, expected.bytes_read_from_remote);
+ EXPECT_EQ(actual.bytes_read_from_peer, expected.bytes_read_from_peer);
+ EXPECT_EQ(actual.remote_io_timer, expected.remote_io_timer);
+ EXPECT_EQ(actual.peer_io_timer, expected.peer_io_timer);
+ EXPECT_EQ(actual.remote_wait_timer, expected.remote_wait_timer);
+ EXPECT_EQ(actual.write_cache_io_timer, expected.write_cache_io_timer);
+ EXPECT_EQ(actual.bytes_write_into_cache, expected.bytes_write_into_cache);
+ EXPECT_EQ(actual.num_skip_cache_io_total,
expected.num_skip_cache_io_total);
+ EXPECT_EQ(actual.read_cache_file_directly_timer,
expected.read_cache_file_directly_timer);
+ EXPECT_EQ(actual.cache_get_or_set_timer, expected.cache_get_or_set_timer);
+ EXPECT_EQ(actual.lock_wait_timer, expected.lock_wait_timer);
+ EXPECT_EQ(actual.get_timer, expected.get_timer);
+ EXPECT_EQ(actual.set_timer, expected.set_timer);
+ EXPECT_EQ(actual.inverted_index_num_local_io_total,
expected.inverted_index_num_local_io_total);
+ EXPECT_EQ(actual.inverted_index_num_remote_io_total,
+ expected.inverted_index_num_remote_io_total);
+ EXPECT_EQ(actual.inverted_index_num_peer_io_total,
expected.inverted_index_num_peer_io_total);
+ EXPECT_EQ(actual.inverted_index_bytes_read_from_local,
+ expected.inverted_index_bytes_read_from_local);
+ EXPECT_EQ(actual.inverted_index_bytes_read_from_remote,
+ expected.inverted_index_bytes_read_from_remote);
+ EXPECT_EQ(actual.inverted_index_bytes_read_from_peer,
+ expected.inverted_index_bytes_read_from_peer);
+ EXPECT_EQ(actual.inverted_index_local_io_timer,
expected.inverted_index_local_io_timer);
+ EXPECT_EQ(actual.inverted_index_remote_io_timer,
expected.inverted_index_remote_io_timer);
+ EXPECT_EQ(actual.inverted_index_peer_io_timer,
expected.inverted_index_peer_io_timer);
+ EXPECT_EQ(actual.inverted_index_io_timer,
expected.inverted_index_io_timer);
+}
+
+} // namespace
+
+TEST(FileCacheProfileReporterTest, DiffReturnsFullStatsWhenPreviousIsZero) {
+ const auto current = make_file_cache_stats(3);
+
+ expect_file_cache_stats_eq(io::diff_file_cache_statistics(current, {}),
current);
+}
+
+TEST(FileCacheProfileReporterTest, DiffReturnsOnlyIncrementalDelta) {
+ expect_file_cache_stats_eq(
+ io::diff_file_cache_statistics(make_file_cache_stats(5),
make_file_cache_stats(3)),
+ make_file_cache_stats(2));
+}
+
+TEST(FileCacheProfileReporterTest, DiffReturnsZeroWithoutNewData) {
+ const auto current = make_file_cache_stats(4);
+
+ expect_file_cache_stats_eq(io::diff_file_cache_statistics(current,
current),
+ make_file_cache_stats(0));
+}
+
+TEST(FileCacheProfileReporterTest,
ReporterAggregatesDeltaReportsToExactFinalTotals) {
+ auto profile = std::make_unique<RuntimeProfile>("test_profile");
+ io::FileCacheProfileReporter reporter(profile.get());
+
+ const auto after_first_report = make_file_cache_stats(4);
+ const auto after_second_report = make_file_cache_stats(7);
+ const auto first_delta =
io::diff_file_cache_statistics(after_first_report, {});
+ reporter.update(&first_delta);
+
+ const auto second_delta =
+ io::diff_file_cache_statistics(after_second_report,
after_first_report);
+ reporter.update(&second_delta);
+
+ EXPECT_EQ(profile->get_counter("BytesScannedFromCache")->value(),
+ after_second_report.bytes_read_from_local);
+ EXPECT_EQ(profile->get_counter("BytesScannedFromRemote")->value(),
+ after_second_report.bytes_read_from_remote);
+ EXPECT_EQ(profile->get_counter("BytesWriteIntoCache")->value(),
+ after_second_report.bytes_write_into_cache);
+ EXPECT_EQ(profile->get_counter("CacheGetOrSetTimer")->value(),
+ after_second_report.cache_get_or_set_timer);
+ EXPECT_EQ(profile->get_counter("LockWaitTimer")->value(),
after_second_report.lock_wait_timer);
+}
+
+} // namespace doris
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index cf4087a2624..b57e1285f87 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -19,6 +19,7 @@
// and modified by Doris
#include "io/cache/block_file_cache_test_common.h"
+#include "io/fs/buffered_reader.h"
#include "storage/olap_define.h"
namespace doris::io {
@@ -43,6 +44,114 @@ fs::path caches_dir = fs::current_path() / "lru_cache_test";
std::string cache_base_path = caches_dir / "cache1" / "";
std::string tmp_file = caches_dir / "tmp_file";
+io::FileCacheSettings cached_remote_reader_cache_settings() {
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+ settings.max_query_cache_size = 0;
+ return settings;
+}
+
+void reset_file_cache_factory_for_test() {
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+}
+
+Status create_cached_remote_reader_cache(const std::string& cache_path,
BlockFileCache** cache) {
+ reset_file_cache_factory_for_test();
+ if (fs::exists(cache_path)) {
+ fs::remove_all(cache_path);
+ }
+ fs::create_directories(cache_path);
+
+ RETURN_IF_ERROR(FileCacheFactory::instance()->create_file_cache(
+ cache_path, cached_remote_reader_cache_settings()));
+ *cache = FileCacheFactory::instance()->_path_to_cache[cache_path];
+ DORIS_CHECK(*cache != nullptr);
+ for (int i = 0; i < 100; i++) {
+ if ((*cache)->get_async_open_success()) {
+ return Status::OK();
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ return Status::TimedOut("file cache async open timeout for path {}",
cache_path);
+}
+
+void cleanup_cached_remote_reader_cache(const std::string& cache_path) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ if (fs::exists(cache_path)) {
+ fs::remove_all(cache_path);
+ }
+ reset_file_cache_factory_for_test();
+}
+
+struct FileCacheMetricSnapshot {
+ int64_t total = 0;
+ int64_t local = 0;
+ int64_t remote = 0;
+ int64_t peer = 0;
+};
+
+FileCacheMetricSnapshot get_file_cache_metric_snapshot() {
+ FileCacheStatistics empty_stats;
+ FileCacheMetrics::instance().update(&empty_stats);
+ std::shared_ptr<AtomicStatistics> stats =
FileCacheMetrics::instance().report();
+ auto local = stats->num_io_bytes_read_from_cache.load();
+ auto remote = stats->num_io_bytes_read_from_remote.load();
+ auto peer = stats->num_io_bytes_read_from_peer.load();
+ return {.total = local + remote + peer, .local = local, .remote = remote,
.peer = peer};
+}
+
+void expect_file_cache_metric_delta(const FileCacheMetricSnapshot& before,
+ const FileCacheMetricSnapshot& after,
int64_t local,
+ int64_t remote, int64_t peer) {
+ EXPECT_EQ(after.local - before.local, local);
+ EXPECT_EQ(after.remote - before.remote, remote);
+ EXPECT_EQ(after.peer - before.peer, peer);
+ EXPECT_EQ(after.total - before.total, local + remote + peer);
+}
+
+class FailAfterOffsetFileReader : public FileReader {
+public:
+ FailAfterOffsetFileReader(FileReaderSPtr reader, size_t fail_offset)
+ : _reader(std::move(reader)), _fail_offset(fail_offset) {}
+ ~FailAfterOffsetFileReader() override = default;
+
+ void set_fail(bool fail) { _fail = fail; }
+
+ Status close() override { return _reader->close(); }
+
+ const Path& path() const override { return _reader->path(); }
+
+ size_t size() const override { return _reader->size(); }
+
+ bool closed() const override { return _reader->closed(); }
+
+ int64_t mtime() const override { return _reader->mtime(); }
+
+protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const IOContext* io_ctx) override {
+ if (_fail && offset >= _fail_offset) {
+ *bytes_read = 0;
+ return Status::IOError("inject remote read failure at offset {}",
offset);
+ }
+ return _reader->read_at(offset, result, bytes_read, io_ctx);
+ }
+
+private:
+ FileReaderSPtr _reader;
+ size_t _fail_offset;
+ bool _fail = false;
+};
+
void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr
file_block,
const io::FileBlock::Range& expected_range,
io::FileBlock::State expected_state) {
auto range = file_block->range();
@@ -3798,6 +3907,136 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_error_handle) {
extern bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found;
+TEST_F(BlockFileCacheTest,
cached_remote_file_reader_warmup_does_not_update_global_metrics) {
+ bool origin_enable_direct_read = config::enable_read_cache_file_directly;
+ config::enable_read_cache_file_directly = false;
+ Defer reset_direct_read {
+ [&] { config::enable_read_cache_file_directly =
origin_enable_direct_read; }};
+
+ std::string cache_base_path = caches_dir /
"cached_remote_reader_warmup_metrics" / "";
+ BlockFileCache* cache = nullptr;
+ ASSERT_TRUE(create_cached_remote_reader_cache(cache_base_path,
&cache).ok());
+ Defer cleanup_cache {[&] {
cleanup_cached_remote_reader_cache(cache_base_path); }};
+
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ opts.tablet_id = 10086;
+ CachedRemoteFileReader reader(local_reader, opts);
+
+ std::string buffer(64_kb, '\0');
+ size_t bytes_read = 0;
+
+ auto before_normal_read = get_file_cache_metric_snapshot();
+ FileCacheStatistics normal_stats;
+ IOContext normal_ctx;
+ normal_ctx.file_cache_stats = &normal_stats;
+ ASSERT_TRUE(
+ reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &normal_ctx).ok());
+ EXPECT_EQ(bytes_read, 64_kb);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
+ EXPECT_EQ(normal_stats.bytes_read_from_local, 0);
+ EXPECT_EQ(normal_stats.bytes_read_from_remote, 64_kb);
+ EXPECT_EQ(normal_stats.bytes_read_from_peer, 0);
+ EXPECT_EQ(normal_stats.num_local_io_total, 0);
+ EXPECT_EQ(normal_stats.num_remote_io_total, 1);
+ auto after_normal_read = get_file_cache_metric_snapshot();
+ expect_file_cache_metric_delta(before_normal_read, after_normal_read, 0,
64_kb, 0);
+
+ auto before_warmup_miss = get_file_cache_metric_snapshot();
+ FileCacheStatistics warmup_miss_stats;
+ IOContext warmup_miss_ctx;
+ warmup_miss_ctx.file_cache_stats = &warmup_miss_stats;
+ warmup_miss_ctx.is_warmup = true;
+ buffer.assign(64_kb, '\0');
+ ASSERT_TRUE(
+ reader.read_at(2_mb, Slice(buffer.data(), buffer.size()),
&bytes_read, &warmup_miss_ctx)
+ .ok());
+ EXPECT_EQ(bytes_read, 64_kb);
+ EXPECT_EQ(std::string(64_kb, '2'), buffer);
+ EXPECT_EQ(warmup_miss_stats.bytes_read_from_local, 0);
+ EXPECT_EQ(warmup_miss_stats.bytes_read_from_remote, 64_kb);
+ EXPECT_EQ(warmup_miss_stats.bytes_read_from_peer, 0);
+ EXPECT_EQ(warmup_miss_stats.num_remote_io_total, 1);
+ auto after_warmup_miss = get_file_cache_metric_snapshot();
+ expect_file_cache_metric_delta(before_warmup_miss, after_warmup_miss, 0,
0, 0);
+
+ auto before_warmup_hit = get_file_cache_metric_snapshot();
+ FileCacheStatistics warmup_hit_stats;
+ IOContext warmup_hit_ctx;
+ warmup_hit_ctx.file_cache_stats = &warmup_hit_stats;
+ warmup_hit_ctx.is_warmup = true;
+ buffer.assign(64_kb, '\0');
+ ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &warmup_hit_ctx)
+ .ok());
+ EXPECT_EQ(bytes_read, 64_kb);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
+ EXPECT_EQ(warmup_hit_stats.bytes_read_from_local, 64_kb);
+ EXPECT_EQ(warmup_hit_stats.bytes_read_from_remote, 0);
+ EXPECT_EQ(warmup_hit_stats.bytes_read_from_peer, 0);
+ EXPECT_EQ(warmup_hit_stats.num_local_io_total, 1);
+ auto after_warmup_hit = get_file_cache_metric_snapshot();
+ expect_file_cache_metric_delta(before_warmup_hit, after_warmup_hit, 0, 0,
0);
+
+ EXPECT_TRUE(reader.close().ok());
+ EXPECT_TRUE(reader.closed());
+}
+
+TEST_F(BlockFileCacheTest,
cached_remote_file_reader_mixed_hit_miss_stats_are_split_by_bytes) {
+ bool origin_enable_direct_read = config::enable_read_cache_file_directly;
+ config::enable_read_cache_file_directly = false;
+ Defer reset_direct_read {
+ [&] { config::enable_read_cache_file_directly =
origin_enable_direct_read; }};
+
+ std::string cache_base_path = caches_dir /
"cached_remote_reader_mixed_hit_miss_stats" / "";
+ BlockFileCache* cache = nullptr;
+ ASSERT_TRUE(create_cached_remote_reader_cache(cache_base_path,
&cache).ok());
+ Defer cleanup_cache {[&] {
cleanup_cached_remote_reader_cache(cache_base_path); }};
+
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ opts.tablet_id = 10086;
+ CachedRemoteFileReader reader(local_reader, opts);
+
+ std::string buffer(64_kb, '\0');
+ size_t bytes_read = 0;
+ FileCacheStatistics prime_stats;
+ IOContext prime_ctx;
+ prime_ctx.file_cache_stats = &prime_stats;
+ ASSERT_TRUE(
+ reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &prime_ctx).ok());
+ EXPECT_EQ(bytes_read, 64_kb);
+ EXPECT_EQ(prime_stats.bytes_read_from_remote, 64_kb);
+
+ auto before_mixed_read = get_file_cache_metric_snapshot();
+ FileCacheStatistics mixed_stats;
+ IOContext mixed_ctx;
+ mixed_ctx.file_cache_stats = &mixed_stats;
+ buffer.assign(64_kb, '\0');
+ ASSERT_TRUE(reader.read_at(1_mb - 32_kb, Slice(buffer.data(),
buffer.size()), &bytes_read,
+ &mixed_ctx)
+ .ok());
+ EXPECT_EQ(bytes_read, 64_kb);
+ EXPECT_EQ(buffer.substr(0, 32_kb), std::string(32_kb, '0'));
+ EXPECT_EQ(buffer.substr(32_kb), std::string(32_kb, '1'));
+ EXPECT_EQ(mixed_stats.bytes_read_from_local, 32_kb);
+ EXPECT_EQ(mixed_stats.bytes_read_from_remote, 32_kb);
+ EXPECT_EQ(mixed_stats.bytes_read_from_peer, 0);
+ EXPECT_EQ(mixed_stats.num_local_io_total, 1);
+ EXPECT_EQ(mixed_stats.num_remote_io_total, 1);
+ EXPECT_EQ(mixed_stats.num_peer_io_total, 0);
+ auto after_mixed_read = get_file_cache_metric_snapshot();
+ expect_file_cache_metric_delta(before_mixed_read, after_mixed_read, 32_kb,
32_kb, 0);
+
+ EXPECT_TRUE(reader.close().ok());
+ EXPECT_TRUE(reader.closed());
+}
+
TEST_F(BlockFileCacheTest,
cached_remote_file_reader_self_heal_on_downloaded_not_found) {
bool origin_enable_direct_read = config::enable_read_cache_file_directly;
config::enable_read_cache_file_directly = false;
@@ -3823,6 +4062,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_self_heal_on_downloaded_not
settings.max_query_cache_size = 0;
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+ Defer cleanup_cache {[&] {
cleanup_cached_remote_reader_cache(cache_base_path); }};
for (int i = 0; i < 100; i++) {
if (cache->get_async_open_success()) {
break;
@@ -3832,11 +4072,12 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_self_heal_on_downloaded_not
FileReaderSPtr local_reader;
ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+ auto memory_reader = std::make_shared<InMemoryFileReader>(local_reader);
io::FileReaderOptions opts;
opts.cache_type = io::cache_type_from_string("file_block_cache");
opts.is_doris_table = true;
opts.tablet_id = 10086;
- CachedRemoteFileReader reader(local_reader, opts);
+ CachedRemoteFileReader reader(memory_reader, opts);
uint64_t before_self_heal =
g_read_cache_self_heal_on_not_found.get_value();
@@ -3847,25 +4088,39 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_self_heal_on_downloaded_not
size_t bytes_read = 0;
ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
EXPECT_EQ(std::string(64_kb, '0'), buffer);
+ EXPECT_EQ(stats.bytes_read_from_local, 0);
+ EXPECT_EQ(stats.bytes_read_from_remote, 64_kb);
+ EXPECT_EQ(stats.bytes_read_from_peer, 0);
auto key = io::BlockFileCache::hash("tmp_file");
- {
- io::CacheContext inspect_ctx;
- ReadStatistics inspect_stats;
- inspect_ctx.stats = &inspect_stats;
- inspect_ctx.cache_type = io::FileCacheType::NORMAL;
- auto inspect_holder = cache->get_or_set(key, 0, 64_kb, inspect_ctx);
- auto inspect_blocks = fromHolder(inspect_holder);
- ASSERT_EQ(inspect_blocks.size(), 1);
- ASSERT_EQ(inspect_blocks[0]->state(),
io::FileBlock::State::DOWNLOADED);
- std::string cache_file = inspect_blocks[0]->get_cache_file();
- ASSERT_TRUE(fs::exists(cache_file));
- ASSERT_TRUE(global_local_filesystem()->delete_file(cache_file).ok());
- ASSERT_FALSE(fs::exists(cache_file));
- }
- ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+ auto* sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ Defer defer {[&] {
+ sp->clear_call_back("LocalFileReader::read_at_impl");
+ sp->disable_processing();
+ }};
+ sp->set_call_back("LocalFileReader::read_at_impl", [&](auto&& values) {
+ std::pair<Status, bool>* pair = try_any_cast<std::pair<Status,
bool>*>(values.back());
+ pair->first = Status::NotFound("inject cache file not found");
+ pair->second = true;
+ });
+
+ auto before_fallback_read = get_file_cache_metric_snapshot();
+ FileCacheStatistics fallback_stats;
+ IOContext fallback_ctx;
+ fallback_ctx.file_cache_stats = &fallback_stats;
+ ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &fallback_ctx)
+ .ok());
EXPECT_EQ(std::string(64_kb, '0'), buffer);
+ EXPECT_EQ(fallback_stats.bytes_read_from_local, 0);
+ EXPECT_EQ(fallback_stats.bytes_read_from_remote, 64_kb);
+ EXPECT_EQ(fallback_stats.bytes_read_from_peer, 0);
+ EXPECT_EQ(fallback_stats.num_local_io_total, 0);
+ EXPECT_EQ(fallback_stats.num_remote_io_total, 1);
+ EXPECT_EQ(fallback_stats.num_peer_io_total, 0);
+ auto after_fallback_read = get_file_cache_metric_snapshot();
+ expect_file_cache_metric_delta(before_fallback_read, after_fallback_read,
0, 64_kb, 0);
bool self_healed = false;
for (int i = 0; i < 100; ++i) {
@@ -3886,13 +4141,6 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_self_heal_on_downloaded_not
EXPECT_TRUE(reader.close().ok());
EXPECT_TRUE(reader.closed());
- std::this_thread::sleep_for(std::chrono::seconds(1));
- if (fs::exists(cache_base_path)) {
- fs::remove_all(cache_base_path);
- }
- FileCacheFactory::instance()->_caches.clear();
- FileCacheFactory::instance()->_path_to_cache.clear();
- FileCacheFactory::instance()->_capacity = 0;
}
TEST_F(BlockFileCacheTest,
cached_remote_file_reader_no_self_heal_on_non_not_found_error) {
@@ -3983,6 +4231,60 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_no_self_heal_on_non_not_fou
FileCacheFactory::instance()->_capacity = 0;
}
+TEST_F(BlockFileCacheTest,
cached_remote_file_reader_failed_read_does_not_update_metrics) {
+ bool origin_enable_direct_read = config::enable_read_cache_file_directly;
+ config::enable_read_cache_file_directly = true;
+ Defer reset_direct_read {
+ [&] { config::enable_read_cache_file_directly =
origin_enable_direct_read; }};
+
+ std::string cache_base_path = caches_dir /
"cached_remote_reader_failed_read_metrics" / "";
+ BlockFileCache* cache = nullptr;
+ ASSERT_TRUE(create_cached_remote_reader_cache(cache_base_path,
&cache).ok());
+ Defer cleanup_cache {[&] {
cleanup_cached_remote_reader_cache(cache_base_path); }};
+
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+ auto remote_reader = std::make_shared<InMemoryFileReader>(local_reader);
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ opts.tablet_id = 10086;
+ auto failing_remote_reader =
+ std::make_shared<FailAfterOffsetFileReader>(remote_reader,
static_cast<size_t>(1_mb));
+ CachedRemoteFileReader reader(failing_remote_reader, opts);
+
+ std::string buffer(64_kb, '\0');
+ size_t bytes_read = 0;
+ FileCacheStatistics prime_stats;
+ IOContext prime_ctx;
+ prime_ctx.file_cache_stats = &prime_stats;
+ ASSERT_TRUE(
+ reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &prime_ctx).ok());
+ EXPECT_EQ(bytes_read, 64_kb);
+ EXPECT_EQ(prime_stats.bytes_read_from_remote, 64_kb);
+ EXPECT_EQ(cache->_cur_cache_size, 1_mb);
+
+ failing_remote_reader->set_fail(true);
+
+ auto before_failed_read = get_file_cache_metric_snapshot();
+ FileCacheStatistics failed_stats;
+ IOContext failed_ctx;
+ failed_ctx.file_cache_stats = &failed_stats;
+ buffer.assign(64_kb, '\0');
+ auto st = reader.read_at(1_mb - 100, Slice(buffer.data(), buffer.size()),
&bytes_read,
+ &failed_ctx);
+ ASSERT_FALSE(st.ok());
+ auto after_failed_read = get_file_cache_metric_snapshot();
+ expect_file_cache_metric_delta(before_failed_read, after_failed_read, 0,
0, 0);
+ EXPECT_EQ(buffer.substr(0, 100), std::string(100, '0'));
+ EXPECT_EQ(failed_stats.bytes_read_from_local, 0);
+ EXPECT_EQ(failed_stats.bytes_read_from_remote, 0);
+ EXPECT_EQ(failed_stats.bytes_read_from_peer, 0);
+
+ EXPECT_TRUE(reader.close().ok());
+ EXPECT_TRUE(reader.closed());
+}
+
TEST_F(BlockFileCacheTest, cached_remote_file_reader_init) {
std::string cache_base_path = caches_dir /
"cached_remote_file_reader_init" / "";
if (fs::exists(cache_base_path)) {
@@ -8081,7 +8383,10 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_direct_read_bytes_check) {
config::enable_evict_file_cache_in_advance = false;
config::file_cache_enter_disk_resource_limit_mode_percent = 99;
+ bool origin_enable_direct_read = config::enable_read_cache_file_directly;
config::enable_read_cache_file_directly = true;
+ Defer reset_direct_read {
+ [&] { config::enable_read_cache_file_directly =
origin_enable_direct_read; }};
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -8148,10 +8453,22 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_direct_read_bytes_check) {
64_kb);
// try to read first two blocks
+ FileCacheStatistics partial_stats;
+ IOContext partial_io_ctx;
+ partial_io_ctx.file_cache_stats = &partial_stats;
+ auto before_partial_read = get_file_cache_metric_snapshot();
ASSERT_TRUE(reader->read_at(1048576 - 100, Slice(buffer.data(),
buffer.size()), &bytes_read,
- &io_ctx)
+ &partial_io_ctx)
.ok());
std::this_thread::sleep_for(std::chrono::seconds(1));
+ EXPECT_EQ(partial_stats.bytes_read_from_local, 100);
+ EXPECT_EQ(partial_stats.bytes_read_from_remote, 64_kb - 100);
+ EXPECT_EQ(partial_stats.bytes_read_from_peer, 0);
+ EXPECT_EQ(partial_stats.num_local_io_total, 1);
+ EXPECT_EQ(partial_stats.num_remote_io_total, 1);
+ EXPECT_EQ(partial_stats.num_peer_io_total, 0);
+ auto after_partial_read = get_file_cache_metric_snapshot();
+ expect_file_cache_metric_delta(before_partial_read, after_partial_read,
100, 64_kb - 100, 0);
EXPECT_EQ(cache->_cur_cache_size, 2097152);
EXPECT_EQ(g_read_cache_direct_partial_num.get_value() -
org_g_read_cache_direct_partial_num, 1);
EXPECT_EQ(g_read_cache_direct_partial_bytes.get_value() -
org_g_read_cache_direct_partial_bytes,
diff --git
a/regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy
b/regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy
new file mode 100644
index 00000000000..e2fc6d6fe65
--- /dev/null
+++
b/regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy
@@ -0,0 +1,457 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+import java.net.ServerSocket
+
+suite('test_file_cache_warmup_read_metrics_docker', 'docker') {
+ if (!isCloudMode()) {
+ return
+ }
+
+ def allocatePort = {
+ def socket = new ServerSocket(0)
+ def port = socket.localPort
+ socket.close()
+ return port
+ }
+ def minioPort = allocatePort()
+ def minioBucket = "test-bucket"
+ def minioContainer =
"doris-file-cache-warmup-metrics-minio-${System.currentTimeMillis()}"
+
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'fetch_cluster_cache_hotspot_interval_ms=1000',
+ 'heartbeat_interval_second=1',
+ 'cloud_warm_up_for_rebalance_type=async_warmup',
+ 'cloud_pre_heating_time_limit_sec=180',
+ 'auto_check_statistics_in_minutes=60',
+ ]
+ options.beConfigs += [
+ 'file_cache_enter_disk_resource_limit_mode_percent=99',
+ 'enable_evict_file_cache_in_advance=false',
+ 'file_cache_background_monitor_interval_ms=1000',
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'disable_segment_cache=true',
+ 'disable_storage_page_cache=true',
+ 'enable_cache_read_from_peer=false',
+ ]
+ options.cloudStoreConfigs += [
+ 'DORIS_CLOUD_USER=minio',
+ 'DORIS_CLOUD_AK=minioadmin',
+ 'DORIS_CLOUD_SK=minioadmin',
+ "DORIS_CLOUD_BUCKET=${minioBucket}",
+ "DORIS_CLOUD_ENDPOINT=host.docker.internal:${minioPort}",
+ "DORIS_CLOUD_EXTERNAL_ENDPOINT=host.docker.internal:${minioPort}",
+ 'DORIS_CLOUD_REGION=us-east-1',
+ 'DORIS_CLOUD_PROVIDER=S3',
+ ]
+ options.extraHosts += [
+ 'host.docker.internal:host-gateway',
+ "${minioBucket}.host.docker.internal:host-gateway",
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+
+ def computeCluster = "compute_cluster"
+ def sourceCluster = "metrics_warmup_source"
+ def targetCluster = "metrics_warmup_target"
+ def payload =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+
+ def waitForCondition = { condition, timeoutMs, message ->
+ long start = System.currentTimeMillis()
+ while (System.currentTimeMillis() - start < timeoutMs) {
+ if (condition()) {
+ return
+ }
+ sleep(1000)
+ }
+ if (!condition()) {
+ throw new RuntimeException(message)
+ }
+ }
+
+ def startMinio = {
+ cmd """docker rm -f ${minioContainer} >/dev/null 2>&1 || true"""
+ cmd """
+ docker run -d --name ${minioContainer} \
+ -p ${minioPort}:9000 \
+ -e MINIO_ROOT_USER=minioadmin \
+ -e MINIO_ROOT_PASSWORD=minioadmin \
+ -e MINIO_DOMAIN=host.docker.internal \
+ minio/minio:RELEASE.2024-11-07T00-52-20Z \
+ server /data --console-address ':9001'
+ """
+ waitForCondition({
+ try {
+ new
URL("http://127.0.0.1:${minioPort}/minio/health/ready").text
+ return true
+ } catch (Throwable ignored) {
+ return false
+ }
+ }, 60000, "MinIO did not become ready")
+ cmd """
+ docker exec ${minioContainer} sh -c \
+ 'mc alias set local http://localhost:9000 minioadmin
minioadmin && mc mb -p local/${minioBucket}'
+ """
+ }
+
+ def stopMinio = {
+ cmd """docker rm -f ${minioContainer} >/dev/null 2>&1 || true"""
+ }
+
+ def getClusterBackends = { clusterName ->
+ def backends = sql """SHOW BACKENDS"""
+ def clusterBackends = backends.findAll {
+ it[19].contains("""\"compute_group_name\" : \"${clusterName}\"""")
+ }
+ assertTrue(clusterBackends.size() > 0, "No backend found for cluster
${clusterName}")
+ return clusterBackends
+ }
+
+ def getPromMetric = { ip, httpPort, name ->
+ def metrics = new URL("http://${ip}:${httpPort}/metrics").text
+ def matcher = metrics =~ ~"(?m)^${name}\\s+([0-9]+(?:\\.[0-9]+)?)"
+ if (matcher.find()) {
+ return new BigDecimal(matcher[0][1]).longValue()
+ }
+ logger.info("${name} not found in /metrics for ${ip}:${httpPort},
treat it as 0")
+ return 0L
+ }
+
+ def getBrpcMetric = { ip, brpcPort, name ->
+ def metrics = new URL("http://${ip}:${brpcPort}/brpc_metrics").text
+ def matcher = metrics =~ ~"(?m)^.*${name}\\s+([0-9]+)"
+ if (matcher.find()) {
+ return matcher[0][1] as long
+ }
+ logger.info("${name} not found in /brpc_metrics for ${ip}:${brpcPort},
treat it as 0")
+ return 0L
+ }
+
+ def getBackendReadMetrics = { be ->
+ def ip = be[1]
+ def httpPort = be[4]
+ return [
+ total : getPromMetric(ip, httpPort,
"doris_be_num_io_bytes_read_total"),
+ local : getPromMetric(ip, httpPort,
"doris_be_num_io_bytes_read_from_cache"),
+ remote: getPromMetric(ip, httpPort,
"doris_be_num_io_bytes_read_from_remote"),
+ peer : getPromMetric(ip, httpPort,
"doris_be_num_io_bytes_read_from_peer"),
+ ]
+ }
+
+ def getClusterReadMetrics = { clusterName ->
+ def result = [total: 0L, local: 0L, remote: 0L, peer: 0L]
+ getClusterBackends(clusterName).each { be ->
+ def metrics = getBackendReadMetrics(be)
+ result.total += metrics.total
+ result.local += metrics.local
+ result.remote += metrics.remote
+ result.peer += metrics.peer
+ }
+ return result
+ }
+
+ def getMetricDelta = { before, after ->
+ return [
+ total : after.total - before.total,
+ local : after.local - before.local,
+ remote: after.remote - before.remote,
+ peer : after.peer - before.peer,
+ ]
+ }
+
+ def assertMetricInvariant = { metrics ->
+ assertEquals(metrics.total, metrics.local + metrics.remote +
metrics.peer)
+ }
+
+ def assertNoReadMetricDelta = { before, after, label ->
+ def delta = getMetricDelta(before, after)
+ logger.info("${label} read metric delta: ${delta}")
+ assertEquals(0L, delta.total)
+ assertEquals(0L, delta.local)
+ assertEquals(0L, delta.remote)
+ assertEquals(0L, delta.peer)
+ }
+
+ def clearFileCache = { ip, httpPort ->
+ def response = new
URL("http://${ip}:${httpPort}/api/file_cache?op=clear&sync=true").text
+ def json = new JsonSlurper().parseText(response)
+ if (json.status != "OK") {
+ throw new RuntimeException("Clear cache on ${ip}:${httpPort}
failed: ${json.status}")
+ }
+ }
+
+ def clearFileCacheOnCluster = { clusterName ->
+ getClusterBackends(clusterName).each { be ->
+ clearFileCache(be[1], be[4])
+ }
+ sleep(5000)
+ }
+
+ def getBackendCacheSize = { be ->
+ return getBrpcMetric(be[1], be[5], "cache_cache_size")
+ }
+
+ def getClusterCacheSizeSum = { clusterName ->
+ long sum = 0
+ getClusterBackends(clusterName).each { be ->
+ sum += getBackendCacheSize(be)
+ }
+ return sum
+ }
+
+ def getClusterBrpcMetricSum = { clusterName, metricName ->
+ long sum = 0
+ getClusterBackends(clusterName).each { be ->
+ sum += getBrpcMetric(be[1], be[5], metricName)
+ }
+ return sum
+ }
+
+ def createTable = { table, buckets ->
+ sql """DROP TABLE IF EXISTS ${table}"""
+ sql """
+ CREATE TABLE ${table} (
+ k1 INT,
+ v1 VARCHAR(4096)
+ )
+ DUPLICATE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS ${buckets}
+ PROPERTIES (
+ "replication_num" = "1",
+ "file_cache_ttl_seconds" = "3600"
+ )
+ """
+ }
+
+ def loadRows = { table, rowStart, rowCount ->
+ int batchSize = 100
+ for (int begin = 0; begin < rowCount; begin += batchSize) {
+ int end = Math.min(rowCount, begin + batchSize)
+ def values = (begin..<end).collect { idx ->
+ int key = rowStart + idx
+ return "(${key}, '${payload}_${key}_${payload}')"
+ }.join(",")
+ sql """INSERT INTO ${table} VALUES ${values}"""
+ }
+ sql """sync"""
+ }
+
+ def queryTable = { table ->
+ def result = sql """SELECT SUM(LENGTH(v1)) FROM ${table} WHERE k1 >=
0"""
+ assertTrue((result[0][0] as String).toLong() > 0)
+ }
+
+ def waitWarmupFinished = { jobId ->
+ waitForCondition({
+ def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ if (jobInfo.size() == 0) {
+ return false
+ }
+ def state = jobInfo[0][3].toString()
+ if (state == "CANCELLED") {
+ throw new RuntimeException("Warm up job ${jobId} was
cancelled")
+ }
+ return state == "FINISHED"
+ }, 120000, "Warm up job ${jobId} did not finish")
+ }
+
+ def cancelWarmupJob = { jobId ->
+ sql """CANCEL WARM UP JOB WHERE ID = ${jobId}"""
+ def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ assertEquals("CANCELLED", jobInfo[0][3])
+ }
+
+ try {
+ startMinio()
+
+ docker(options) {
+ cluster.addBackend(1, sourceCluster)
+ cluster.addBackend(1, targetCluster)
+
+ sql """SET enable_file_cache = true"""
+ sql """SET enable_sql_cache = false"""
+ sql """SET enable_query_cache = false"""
+
+ sql """use @${computeCluster}"""
+ def manualTable = "test_file_cache_metrics_manual"
+ createTable(manualTable, 1)
+ loadRows(manualTable, 1, 300)
+ clearFileCacheOnCluster(computeCluster)
+
+ def beforeQueryMiss = getClusterReadMetrics(computeCluster)
+ queryTable(manualTable)
+ waitForCondition({
+ def delta = getMetricDelta(beforeQueryMiss,
getClusterReadMetrics(computeCluster))
+ return delta.total > 0 && delta.remote > 0
+ }, 30000, "Normal query miss did not increase global remote read
metrics")
+ def queryMissDelta = getMetricDelta(beforeQueryMiss,
getClusterReadMetrics(computeCluster))
+ logger.info("query miss read metric delta: ${queryMissDelta}")
+ assertMetricInvariant(queryMissDelta)
+ assertTrue(queryMissDelta.remote > 0)
+
+ clearFileCacheOnCluster(computeCluster)
+ def beforeManualWarmup = getClusterReadMetrics(computeCluster)
+ def manualJob = sql """WARM UP CLUSTER ${computeCluster} WITH
TABLE ${manualTable}"""
+ waitWarmupFinished(manualJob[0][0])
+ waitForCondition({
+ return getClusterCacheSizeSum(computeCluster) > 0
+ }, 30000, "Manual table warm up did not populate file cache")
+ def afterManualWarmup = getClusterReadMetrics(computeCluster)
+ assertNoReadMetricDelta(beforeManualWarmup, afterManualWarmup,
"manual table warm up")
+
+ def beforeQueryHit = getClusterReadMetrics(computeCluster)
+ queryTable(manualTable)
+ waitForCondition({
+ def delta = getMetricDelta(beforeQueryHit,
getClusterReadMetrics(computeCluster))
+ return delta.total > 0 && delta.local > 0
+ }, 30000, "Query after warm up did not increase global local read
metrics")
+ def queryHitDelta = getMetricDelta(beforeQueryHit,
getClusterReadMetrics(computeCluster))
+ logger.info("query hit read metric delta: ${queryHitDelta}")
+ assertMetricInvariant(queryHitDelta)
+ assertTrue(queryHitDelta.local > 0)
+
+ sql """use @${sourceCluster}"""
+ sql """TRUNCATE TABLE __internal_schema.cloud_cache_hotspot"""
+ def periodicTable = "test_file_cache_metrics_periodic"
+ createTable(periodicTable, 1)
+ loadRows(periodicTable, 1000, 300)
+ clearFileCacheOnCluster(sourceCluster)
+ clearFileCacheOnCluster(targetCluster)
+
+ def beforePeriodicWarmup = getClusterReadMetrics(targetCluster)
+ def periodicJob = sql """
+ WARM UP CLUSTER ${targetCluster} WITH CLUSTER ${sourceCluster}
+ PROPERTIES (
+ "sync_mode" = "periodic",
+ "sync_interval_sec" = "1"
+ )
+ """
+ for (int i = 0; i < 120; i++) {
+ queryTable(periodicTable)
+ }
+ waitForCondition({
+ return getClusterCacheSizeSum(sourceCluster) > 0 &&
+ getClusterCacheSizeSum(targetCluster) > 0
+ }, 90000, "Periodic warm up did not populate target file cache")
+ def afterPeriodicWarmup = getClusterReadMetrics(targetCluster)
+ assertNoReadMetricDelta(beforePeriodicWarmup, afterPeriodicWarmup,
+ "periodic cluster warm up")
+ cancelWarmupJob(periodicJob[0][0])
+
+ def eventTable = "test_file_cache_metrics_event"
+ createTable(eventTable, 1)
+ clearFileCacheOnCluster(sourceCluster)
+ clearFileCacheOnCluster(targetCluster)
+ def beforeEventWarmup = getClusterReadMetrics(targetCluster)
+ def eventJob = sql """
+ WARM UP CLUSTER ${targetCluster} WITH CLUSTER ${sourceCluster}
+ PROPERTIES (
+ "sync_mode" = "event_driven",
+ "sync_event" = "load"
+ )
+ """
+ waitForCondition({
+ def jobInfo = sql """SHOW WARM UP JOB WHERE ID =
${eventJob[0][0]}"""
+ return jobInfo.size() > 0 && jobInfo[0][3].toString() in
["RUNNING", "PENDING"]
+ }, 30000, "Event-driven warm up job did not enter running state")
+ sleep(15000)
+ def beforeEventSubmitted = getClusterBrpcMetricSum(targetCluster,
+ "file_cache_event_driven_warm_up_submitted_segment_num")
+ def beforeEventFinished = getClusterBrpcMetricSum(targetCluster,
+ "file_cache_event_driven_warm_up_finished_segment_num")
+ def beforeEventFailed = getClusterBrpcMetricSum(targetCluster,
+ "file_cache_event_driven_warm_up_failed_segment_num")
+ loadRows(eventTable, 2000, 300)
+ waitForCondition({
+ def submitted = getClusterBrpcMetricSum(targetCluster,
+
"file_cache_event_driven_warm_up_submitted_segment_num")
+ def finished = getClusterBrpcMetricSum(targetCluster,
+ "file_cache_event_driven_warm_up_finished_segment_num")
+ return submitted > beforeEventSubmitted && finished >=
submitted
+ }, 90000, "Event-driven warm up did not finish target segment
downloads")
+ def afterEventSubmitted = getClusterBrpcMetricSum(targetCluster,
+ "file_cache_event_driven_warm_up_submitted_segment_num")
+ def afterEventFinished = getClusterBrpcMetricSum(targetCluster,
+ "file_cache_event_driven_warm_up_finished_segment_num")
+ def afterEventFailed = getClusterBrpcMetricSum(targetCluster,
+ "file_cache_event_driven_warm_up_failed_segment_num")
+ logger.info("event-driven warm up segment metrics: submitted
${beforeEventSubmitted}"
+ + " -> ${afterEventSubmitted}, finished
${beforeEventFinished}"
+ + " -> ${afterEventFinished}, failed ${beforeEventFailed}"
+ + " -> ${afterEventFailed}")
+ assertEquals(beforeEventFailed, afterEventFailed)
+ def afterEventWarmup = getClusterReadMetrics(targetCluster)
+ assertNoReadMetricDelta(beforeEventWarmup, afterEventWarmup,
+ "event-driven cluster warm up")
+ cancelWarmupJob(eventJob[0][0])
+
+ sql """use @${computeCluster}"""
+ def rebalanceTable = "test_file_cache_metrics_rebalance"
+ createTable(rebalanceTable, 8)
+ loadRows(rebalanceTable, 3000, 500)
+ clearFileCacheOnCluster(computeCluster)
+ def beforeRebalanceSourceQuery =
getClusterReadMetrics(computeCluster)
+ queryTable(rebalanceTable)
+ waitForCondition({
+ def delta = getMetricDelta(beforeRebalanceSourceQuery,
+ getClusterReadMetrics(computeCluster))
+ return delta.total > 0 && delta.remote > 0
+ }, 30000, "Rebalance source query did not populate source file
cache")
+
+ def beforeBackendIds = getClusterBackends(computeCluster).collect
{ it[0].toString() } as Set
+ def beforeRebalanceWarmup = getClusterReadMetrics(computeCluster)
+ cluster.addBackend(1, computeCluster)
+ def newBackendHolder = [:]
+ waitForCondition({
+ def added = getClusterBackends(computeCluster).find {
+ !beforeBackendIds.contains(it[0].toString())
+ }
+ if (added != null) {
+ newBackendHolder.be = added
+ return true
+ }
+ return false
+ }, 30000, "New backend was not added to ${computeCluster}")
+
+ waitForCondition({
+ def distribution = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM ${rebalanceTable}"""
+ return distribution.any {
+ it.BackendId.toString() ==
newBackendHolder.be[0].toString() &&
+ Integer.valueOf(it.ReplicaNum.toString()) > 0
+ }
+ }, 180000, "Rebalance did not move any replica to the new backend")
+
+ waitForCondition({
+ return getBackendCacheSize(newBackendHolder.be) > 0
+ }, 180000, "Rebalance warm up did not populate file cache on the
new backend")
+
+ def afterRebalanceWarmup = getClusterReadMetrics(computeCluster)
+ assertNoReadMetricDelta(beforeRebalanceWarmup,
afterRebalanceWarmup,
+ "rebalance warm up")
+ }
+ } finally {
+ stopMinio()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]