This is an automated email from the ASF dual-hosted git repository.

freemandealer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ec8f7d0d61 [fix](filecache) exclude warmup reads from file cache hit 
ratio metrics (#63394)
7ec8f7d0d61 is described below

commit 7ec8f7d0d61d1ab47353ce095ac5b879a28fb15e
Author: zhengyu <[email protected]>
AuthorDate: Tue Jun 2 18:03:22 2026 +0800

    [fix](filecache) exclude warmup reads from file cache hit ratio metrics 
(#63394)
    
    File cache hit ratio metrics are derived from global
    file cache read bytes, but warmup reads from manual warmup, periodic
    warmup, event-driven warmup, and rebalance-triggered warmup used to
    update the same counters as query reads. This polluted the query hit
    ratio. Mixed hit/miss reads could also be attributed to one source for
    the whole request. This change skips warmup updates to global file cache
    read metrics while preserving per-IOContext profile stats, records
    local/remote/peer bytes by actual returned bytes, and avoids updating
    metrics for failed reads. It also fixes direct-read partial continuation
    and no-warmup miss-only hit ratio refresh.
---
 be/src/io/cache/block_file_cache.cpp               |   6 +-
 be/src/io/cache/block_file_cache_profile.cpp       |  37 ++
 be/src/io/cache/block_file_cache_profile.h         |   7 +-
 be/src/io/cache/cached_remote_file_reader.cpp      |  82 ++--
 be/src/io/cache/file_cache_common.h                |   3 +
 be/src/storage/storage_engine.cpp                  |   1 +
 .../block_file_cache_profile_reporter_test.cpp     | 139 +++++++
 be/test/io/cache/block_file_cache_test.cpp         | 365 ++++++++++++++--
 ...st_file_cache_warmup_read_metrics_docker.groovy | 457 +++++++++++++++++++++
 9 files changed, 1039 insertions(+), 58 deletions(-)

diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index 9f1fc29f1f1..dfbad8a63ed 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -2053,16 +2053,16 @@ void BlockFileCache::run_background_monitor() {
                                          
(double)_num_read_blocks_1h->get_value());
             }
 
-            if (_no_warmup_num_hit_blocks->get_value() > 0) {
+            if (_no_warmup_num_read_blocks->get_value() > 0) {
                 
_no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
                                                 
(double)_no_warmup_num_read_blocks->get_value());
             }
-            if (_no_warmup_num_hit_blocks_5m && 
_no_warmup_num_hit_blocks_5m->get_value() > 0) {
+            if (_no_warmup_num_read_blocks_5m && 
_no_warmup_num_read_blocks_5m->get_value() > 0) {
                 _no_warmup_hit_ratio_5m->set_value(
                         (double)_no_warmup_num_hit_blocks_5m->get_value() /
                         (double)_no_warmup_num_read_blocks_5m->get_value());
             }
-            if (_no_warmup_num_hit_blocks_1h && 
_no_warmup_num_hit_blocks_1h->get_value() > 0) {
+            if (_no_warmup_num_read_blocks_1h && 
_no_warmup_num_read_blocks_1h->get_value() > 0) {
                 _no_warmup_hit_ratio_1h->set_value(
                         (double)_no_warmup_num_hit_blocks_1h->get_value() /
                         (double)_no_warmup_num_read_blocks_1h->get_value());
diff --git a/be/src/io/cache/block_file_cache_profile.cpp 
b/be/src/io/cache/block_file_cache_profile.cpp
index 692174dbbcb..8f9c167c998 100644
--- a/be/src/io/cache/block_file_cache_profile.cpp
+++ b/be/src/io/cache/block_file_cache_profile.cpp
@@ -65,6 +65,43 @@ void FileCacheMetrics::update_metrics_callback() {
             stats->num_io_bytes_read_from_peer);
 }
 
+FileCacheStatistics diff_file_cache_statistics(const FileCacheStatistics& 
current,
+                                               const FileCacheStatistics& 
previous) {
+    FileCacheStatistics diff;
+#define SUBTRACT_FIELD(field) diff.field = current.field - previous.field
+    SUBTRACT_FIELD(num_local_io_total);
+    SUBTRACT_FIELD(num_remote_io_total);
+    SUBTRACT_FIELD(num_peer_io_total);
+    SUBTRACT_FIELD(local_io_timer);
+    SUBTRACT_FIELD(bytes_read_from_local);
+    SUBTRACT_FIELD(bytes_read_from_remote);
+    SUBTRACT_FIELD(bytes_read_from_peer);
+    SUBTRACT_FIELD(remote_io_timer);
+    SUBTRACT_FIELD(peer_io_timer);
+    SUBTRACT_FIELD(remote_wait_timer);
+    SUBTRACT_FIELD(write_cache_io_timer);
+    SUBTRACT_FIELD(bytes_write_into_cache);
+    SUBTRACT_FIELD(num_skip_cache_io_total);
+    SUBTRACT_FIELD(read_cache_file_directly_timer);
+    SUBTRACT_FIELD(cache_get_or_set_timer);
+    SUBTRACT_FIELD(lock_wait_timer);
+    SUBTRACT_FIELD(get_timer);
+    SUBTRACT_FIELD(set_timer);
+
+    SUBTRACT_FIELD(inverted_index_num_local_io_total);
+    SUBTRACT_FIELD(inverted_index_num_remote_io_total);
+    SUBTRACT_FIELD(inverted_index_num_peer_io_total);
+    SUBTRACT_FIELD(inverted_index_bytes_read_from_local);
+    SUBTRACT_FIELD(inverted_index_bytes_read_from_remote);
+    SUBTRACT_FIELD(inverted_index_bytes_read_from_peer);
+    SUBTRACT_FIELD(inverted_index_local_io_timer);
+    SUBTRACT_FIELD(inverted_index_remote_io_timer);
+    SUBTRACT_FIELD(inverted_index_peer_io_timer);
+    SUBTRACT_FIELD(inverted_index_io_timer);
+#undef SUBTRACT_FIELD
+    return diff;
+}
+
 FileCacheProfileReporter::FileCacheProfileReporter(RuntimeProfile* profile) {
     static const char* cache_profile = "FileCache";
     ADD_TIMER_WITH_LEVEL(profile, cache_profile, 2);
diff --git a/be/src/io/cache/block_file_cache_profile.h 
b/be/src/io/cache/block_file_cache_profile.h
index 5fdb82fbd61..6c95e49791c 100644
--- a/be/src/io/cache/block_file_cache_profile.h
+++ b/be/src/io/cache/block_file_cache_profile.h
@@ -52,9 +52,9 @@ public:
     }
 
     void update(FileCacheStatistics* stats);
+    std::shared_ptr<AtomicStatistics> report();
 
 private:
-    std::shared_ptr<AtomicStatistics> report();
     void register_entity();
     void update_metrics_callback();
 
@@ -64,6 +64,9 @@ private:
     std::shared_ptr<AtomicStatistics> _statistics;
 };
 
+FileCacheStatistics diff_file_cache_statistics(const FileCacheStatistics& 
current,
+                                               const FileCacheStatistics& 
previous);
+
 struct FileCacheProfileReporter {
     RuntimeProfile::Counter* num_local_io_total = nullptr;
     RuntimeProfile::Counter* num_remote_io_total = nullptr;
@@ -100,4 +103,4 @@ struct FileCacheProfileReporter {
 };
 
 } // namespace io
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp 
b/be/src/io/cache/cached_remote_file_reader.cpp
index c7f6ddffc5f..7c297737d60 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -297,6 +297,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
 
     ReadStatistics stats;
     stats.bytes_read += bytes_req;
+    bool read_success = false;
     MonotonicStopWatch read_at_sw;
     read_at_sw.start();
     auto defer_func = [&](int*) {
@@ -315,10 +316,15 @@ Status CachedRemoteFileReader::read_at_impl(size_t 
offset, Slice result, size_t*
         if (is_dryrun) {
             return;
         }
-        // update stats increment in this reading procedure for file cache 
metrics
-        FileCacheStatistics fcache_stats_increment;
-        _update_stats(stats, &fcache_stats_increment, 
io_ctx->is_inverted_index);
-        io::FileCacheMetrics::instance().update(&fcache_stats_increment);
+        if (!read_success) {
+            return;
+        }
+        if (!io_ctx->is_warmup) {
+            // update stats increment in this reading procedure for file cache 
metrics
+            FileCacheStatistics fcache_stats_increment;
+            _update_stats(stats, &fcache_stats_increment, 
io_ctx->is_inverted_index);
+            io::FileCacheMetrics::instance().update(&fcache_stats_increment);
+        }
         if (io_ctx->file_cache_stats) {
             // update stats in io_ctx, for query profile
             _update_stats(stats, io_ctx->file_cache_stats, 
io_ctx->is_inverted_index);
@@ -355,6 +361,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
                                  .ok()) { //TODO: maybe read failed because 
block evict, should handle error
                         break;
                     }
+                    stats.bytes_read_from_local += reserve_bytes;
                 }
                 _cache->add_need_update_lru_block(iter->second);
                 need_read_size -= reserve_bytes;
@@ -367,6 +374,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
                 stats.hit_cache = true;
                 g_read_cache_direct_whole_num << 1;
                 g_read_cache_direct_whole_bytes << bytes_req;
+                read_success = true;
                 return Status::OK();
             } else {
                 g_read_cache_direct_partial_num << 1;
@@ -442,6 +450,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
         // Determine read type and execute remote read
         RETURN_IF_ERROR(
                 _execute_remote_read(empty_blocks, empty_start, size, buffer, 
stats, io_ctx));
+        bool empty_blocks_from_peer_cache = stats.from_peer_cache;
 
         {
             SCOPED_CONCURRENCY_COUNT(
@@ -476,6 +485,11 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
             size_t copy_size = copy_right_offset - copy_left_offset + 1;
             memcpy(dst, src, copy_size);
             indirect_read_bytes += copy_size;
+            if (empty_blocks_from_peer_cache) {
+                stats.bytes_read_from_peer += copy_size;
+            } else {
+                stats.bytes_read_from_remote += copy_size;
+            }
         }
     }
 
@@ -536,7 +550,10 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
                             
ConcurrencyStatsManager::instance().cached_remote_reader_local_read);
                     st = block->read(Slice(result.data + (current_offset - 
offset), read_size),
                                      file_offset);
-                    indirect_read_bytes += read_size;
+                    if (st.ok()) {
+                        indirect_read_bytes += read_size;
+                        stats.bytes_read_from_local += read_size;
+                    }
                 }
             }
             if (!st || block_state != FileBlock::State::DOWNLOADED) {
@@ -567,6 +584,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
                             &nest_bytes_read));
                     indirect_read_bytes += read_size;
                     DCHECK(nest_bytes_read == read_size);
+                    stats.bytes_read_from_remote += nest_bytes_read;
                 }
             }
         }
@@ -580,6 +598,12 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
     g_read_cache_indirect_total_bytes << *bytes_read;
 
     DCHECK(*bytes_read == bytes_req);
+    if (!is_dryrun) {
+        DCHECK_EQ(stats.bytes_read_from_local + stats.bytes_read_from_remote +
+                          stats.bytes_read_from_peer,
+                  bytes_req);
+    }
+    read_success = true;
     return Status::OK();
 }
 
@@ -589,19 +613,19 @@ void CachedRemoteFileReader::_update_stats(const 
ReadStatistics& read_stats,
     if (statis == nullptr) {
         return;
     }
-    if (read_stats.hit_cache) {
+    if (read_stats.bytes_read_from_local > 0) {
         statis->num_local_io_total++;
-        statis->bytes_read_from_local += read_stats.bytes_read;
-    } else {
-        if (read_stats.from_peer_cache) {
-            statis->num_peer_io_total++;
-            statis->bytes_read_from_peer += read_stats.bytes_read;
-            statis->peer_io_timer += read_stats.peer_read_timer;
-        } else {
-            statis->num_remote_io_total++;
-            statis->bytes_read_from_remote += read_stats.bytes_read;
-            statis->remote_io_timer += read_stats.remote_read_timer;
-        }
+        statis->bytes_read_from_local += read_stats.bytes_read_from_local;
+    }
+    if (read_stats.bytes_read_from_remote > 0) {
+        statis->num_remote_io_total++;
+        statis->bytes_read_from_remote += read_stats.bytes_read_from_remote;
+        statis->remote_io_timer += read_stats.remote_read_timer;
+    }
+    if (read_stats.bytes_read_from_peer > 0) {
+        statis->num_peer_io_total++;
+        statis->bytes_read_from_peer += read_stats.bytes_read_from_peer;
+        statis->peer_io_timer += read_stats.peer_read_timer;
     }
     statis->remote_wait_timer += read_stats.remote_wait_timer;
     statis->local_io_timer += read_stats.local_read_timer;
@@ -616,19 +640,19 @@ void CachedRemoteFileReader::_update_stats(const 
ReadStatistics& read_stats,
     statis->set_timer += read_stats.set_timer;
 
     if (is_inverted_index) {
-        if (read_stats.hit_cache) {
+        if (read_stats.bytes_read_from_local > 0) {
             statis->inverted_index_num_local_io_total++;
-            statis->inverted_index_bytes_read_from_local += 
read_stats.bytes_read;
-        } else {
-            if (read_stats.from_peer_cache) {
-                statis->inverted_index_num_peer_io_total++;
-                statis->inverted_index_bytes_read_from_peer += 
read_stats.bytes_read;
-                statis->inverted_index_peer_io_timer += 
read_stats.peer_read_timer;
-            } else {
-                statis->inverted_index_num_remote_io_total++;
-                statis->inverted_index_bytes_read_from_remote += 
read_stats.bytes_read;
-                statis->inverted_index_remote_io_timer += 
read_stats.remote_read_timer;
-            }
+            statis->inverted_index_bytes_read_from_local += 
read_stats.bytes_read_from_local;
+        }
+        if (read_stats.bytes_read_from_remote > 0) {
+            statis->inverted_index_num_remote_io_total++;
+            statis->inverted_index_bytes_read_from_remote += 
read_stats.bytes_read_from_remote;
+            statis->inverted_index_remote_io_timer += 
read_stats.remote_read_timer;
+        }
+        if (read_stats.bytes_read_from_peer > 0) {
+            statis->inverted_index_num_peer_io_total++;
+            statis->inverted_index_bytes_read_from_peer += 
read_stats.bytes_read_from_peer;
+            statis->inverted_index_peer_io_timer += read_stats.peer_read_timer;
         }
         statis->inverted_index_local_io_timer += read_stats.local_read_timer;
     }
diff --git a/be/src/io/cache/file_cache_common.h 
b/be/src/io/cache/file_cache_common.h
index 43510293af8..8fde90abc0a 100644
--- a/be/src/io/cache/file_cache_common.h
+++ b/be/src/io/cache/file_cache_common.h
@@ -71,6 +71,9 @@ struct ReadStatistics {
     bool from_peer_cache = false;
     bool skip_cache = false;
     int64_t bytes_read = 0;
+    int64_t bytes_read_from_local = 0;
+    int64_t bytes_read_from_remote = 0;
+    int64_t bytes_read_from_peer = 0;
     int64_t bytes_write_into_file_cache = 0;
     int64_t remote_read_timer = 0;
     int64_t peer_read_timer = 0;
diff --git a/be/src/storage/storage_engine.cpp 
b/be/src/storage/storage_engine.cpp
index 92f919f9831..9f5b545871c 100644
--- a/be/src/storage/storage_engine.cpp
+++ b/be/src/storage/storage_engine.cpp
@@ -257,6 +257,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
 }
 
 StorageEngine::~StorageEngine() {
+    DEREGISTER_HOOK_METRIC(unused_rowsets_count);
     stop();
 }
 
diff --git a/be/test/io/cache/block_file_cache_profile_reporter_test.cpp 
b/be/test/io/cache/block_file_cache_profile_reporter_test.cpp
new file mode 100644
index 00000000000..e74ad758ac1
--- /dev/null
+++ b/be/test/io/cache/block_file_cache_profile_reporter_test.cpp
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "io/cache/block_file_cache_profile.h"
+
+namespace doris {
+namespace {
+
+io::FileCacheStatistics make_file_cache_stats(int64_t multiplier) {
+    io::FileCacheStatistics stats;
+    stats.num_local_io_total = multiplier;
+    stats.num_remote_io_total = multiplier * 2;
+    stats.num_peer_io_total = multiplier * 3;
+    stats.local_io_timer = multiplier * 4;
+    stats.bytes_read_from_local = multiplier * 5;
+    stats.bytes_read_from_remote = multiplier * 6;
+    stats.bytes_read_from_peer = multiplier * 7;
+    stats.remote_io_timer = multiplier * 8;
+    stats.peer_io_timer = multiplier * 9;
+    stats.remote_wait_timer = multiplier * 10;
+    stats.write_cache_io_timer = multiplier * 11;
+    stats.bytes_write_into_cache = multiplier * 12;
+    stats.num_skip_cache_io_total = multiplier * 13;
+    stats.read_cache_file_directly_timer = multiplier * 14;
+    stats.cache_get_or_set_timer = multiplier * 15;
+    stats.lock_wait_timer = multiplier * 16;
+    stats.get_timer = multiplier * 17;
+    stats.set_timer = multiplier * 18;
+    stats.inverted_index_num_local_io_total = multiplier * 19;
+    stats.inverted_index_num_remote_io_total = multiplier * 20;
+    stats.inverted_index_num_peer_io_total = multiplier * 21;
+    stats.inverted_index_bytes_read_from_local = multiplier * 22;
+    stats.inverted_index_bytes_read_from_remote = multiplier * 23;
+    stats.inverted_index_bytes_read_from_peer = multiplier * 24;
+    stats.inverted_index_local_io_timer = multiplier * 25;
+    stats.inverted_index_remote_io_timer = multiplier * 26;
+    stats.inverted_index_peer_io_timer = multiplier * 27;
+    stats.inverted_index_io_timer = multiplier * 28;
+    return stats;
+}
+
+void expect_file_cache_stats_eq(const io::FileCacheStatistics& actual,
+                                const io::FileCacheStatistics& expected) {
+    EXPECT_EQ(actual.num_local_io_total, expected.num_local_io_total);
+    EXPECT_EQ(actual.num_remote_io_total, expected.num_remote_io_total);
+    EXPECT_EQ(actual.num_peer_io_total, expected.num_peer_io_total);
+    EXPECT_EQ(actual.local_io_timer, expected.local_io_timer);
+    EXPECT_EQ(actual.bytes_read_from_local, expected.bytes_read_from_local);
+    EXPECT_EQ(actual.bytes_read_from_remote, expected.bytes_read_from_remote);
+    EXPECT_EQ(actual.bytes_read_from_peer, expected.bytes_read_from_peer);
+    EXPECT_EQ(actual.remote_io_timer, expected.remote_io_timer);
+    EXPECT_EQ(actual.peer_io_timer, expected.peer_io_timer);
+    EXPECT_EQ(actual.remote_wait_timer, expected.remote_wait_timer);
+    EXPECT_EQ(actual.write_cache_io_timer, expected.write_cache_io_timer);
+    EXPECT_EQ(actual.bytes_write_into_cache, expected.bytes_write_into_cache);
+    EXPECT_EQ(actual.num_skip_cache_io_total, 
expected.num_skip_cache_io_total);
+    EXPECT_EQ(actual.read_cache_file_directly_timer, 
expected.read_cache_file_directly_timer);
+    EXPECT_EQ(actual.cache_get_or_set_timer, expected.cache_get_or_set_timer);
+    EXPECT_EQ(actual.lock_wait_timer, expected.lock_wait_timer);
+    EXPECT_EQ(actual.get_timer, expected.get_timer);
+    EXPECT_EQ(actual.set_timer, expected.set_timer);
+    EXPECT_EQ(actual.inverted_index_num_local_io_total, 
expected.inverted_index_num_local_io_total);
+    EXPECT_EQ(actual.inverted_index_num_remote_io_total,
+              expected.inverted_index_num_remote_io_total);
+    EXPECT_EQ(actual.inverted_index_num_peer_io_total, 
expected.inverted_index_num_peer_io_total);
+    EXPECT_EQ(actual.inverted_index_bytes_read_from_local,
+              expected.inverted_index_bytes_read_from_local);
+    EXPECT_EQ(actual.inverted_index_bytes_read_from_remote,
+              expected.inverted_index_bytes_read_from_remote);
+    EXPECT_EQ(actual.inverted_index_bytes_read_from_peer,
+              expected.inverted_index_bytes_read_from_peer);
+    EXPECT_EQ(actual.inverted_index_local_io_timer, 
expected.inverted_index_local_io_timer);
+    EXPECT_EQ(actual.inverted_index_remote_io_timer, 
expected.inverted_index_remote_io_timer);
+    EXPECT_EQ(actual.inverted_index_peer_io_timer, 
expected.inverted_index_peer_io_timer);
+    EXPECT_EQ(actual.inverted_index_io_timer, 
expected.inverted_index_io_timer);
+}
+
+} // namespace
+
+TEST(FileCacheProfileReporterTest, DiffReturnsFullStatsWhenPreviousIsZero) {
+    const auto current = make_file_cache_stats(3);
+
+    expect_file_cache_stats_eq(io::diff_file_cache_statistics(current, {}), 
current);
+}
+
+TEST(FileCacheProfileReporterTest, DiffReturnsOnlyIncrementalDelta) {
+    expect_file_cache_stats_eq(
+            io::diff_file_cache_statistics(make_file_cache_stats(5), 
make_file_cache_stats(3)),
+            make_file_cache_stats(2));
+}
+
+TEST(FileCacheProfileReporterTest, DiffReturnsZeroWithoutNewData) {
+    const auto current = make_file_cache_stats(4);
+
+    expect_file_cache_stats_eq(io::diff_file_cache_statistics(current, 
current),
+                               make_file_cache_stats(0));
+}
+
+TEST(FileCacheProfileReporterTest, 
ReporterAggregatesDeltaReportsToExactFinalTotals) {
+    auto profile = std::make_unique<RuntimeProfile>("test_profile");
+    io::FileCacheProfileReporter reporter(profile.get());
+
+    const auto after_first_report = make_file_cache_stats(4);
+    const auto after_second_report = make_file_cache_stats(7);
+    const auto first_delta = 
io::diff_file_cache_statistics(after_first_report, {});
+    reporter.update(&first_delta);
+
+    const auto second_delta =
+            io::diff_file_cache_statistics(after_second_report, 
after_first_report);
+    reporter.update(&second_delta);
+
+    EXPECT_EQ(profile->get_counter("BytesScannedFromCache")->value(),
+              after_second_report.bytes_read_from_local);
+    EXPECT_EQ(profile->get_counter("BytesScannedFromRemote")->value(),
+              after_second_report.bytes_read_from_remote);
+    EXPECT_EQ(profile->get_counter("BytesWriteIntoCache")->value(),
+              after_second_report.bytes_write_into_cache);
+    EXPECT_EQ(profile->get_counter("CacheGetOrSetTimer")->value(),
+              after_second_report.cache_get_or_set_timer);
+    EXPECT_EQ(profile->get_counter("LockWaitTimer")->value(), 
after_second_report.lock_wait_timer);
+}
+
+} // namespace doris
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index cf4087a2624..b57e1285f87 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -19,6 +19,7 @@
 // and modified by Doris
 
 #include "io/cache/block_file_cache_test_common.h"
+#include "io/fs/buffered_reader.h"
 #include "storage/olap_define.h"
 
 namespace doris::io {
@@ -43,6 +44,114 @@ fs::path caches_dir = fs::current_path() / "lru_cache_test";
 std::string cache_base_path = caches_dir / "cache1" / "";
 std::string tmp_file = caches_dir / "tmp_file";
 
+io::FileCacheSettings cached_remote_reader_cache_settings() {
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 6291456;
+    settings.query_queue_elements = 6;
+    settings.index_queue_size = 1048576;
+    settings.index_queue_elements = 1;
+    settings.disposable_queue_size = 1048576;
+    settings.disposable_queue_elements = 1;
+    settings.capacity = 8388608;
+    settings.max_file_block_size = 1048576;
+    settings.max_query_cache_size = 0;
+    return settings;
+}
+
+void reset_file_cache_factory_for_test() {
+    FileCacheFactory::instance()->_caches.clear();
+    FileCacheFactory::instance()->_path_to_cache.clear();
+    FileCacheFactory::instance()->_capacity = 0;
+}
+
+Status create_cached_remote_reader_cache(const std::string& cache_path, 
BlockFileCache** cache) {
+    reset_file_cache_factory_for_test();
+    if (fs::exists(cache_path)) {
+        fs::remove_all(cache_path);
+    }
+    fs::create_directories(cache_path);
+
+    RETURN_IF_ERROR(FileCacheFactory::instance()->create_file_cache(
+            cache_path, cached_remote_reader_cache_settings()));
+    *cache = FileCacheFactory::instance()->_path_to_cache[cache_path];
+    DORIS_CHECK(*cache != nullptr);
+    for (int i = 0; i < 100; i++) {
+        if ((*cache)->get_async_open_success()) {
+            return Status::OK();
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    return Status::TimedOut("file cache async open timeout for path {}", 
cache_path);
+}
+
+void cleanup_cached_remote_reader_cache(const std::string& cache_path) {
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    if (fs::exists(cache_path)) {
+        fs::remove_all(cache_path);
+    }
+    reset_file_cache_factory_for_test();
+}
+
+struct FileCacheMetricSnapshot {
+    int64_t total = 0;
+    int64_t local = 0;
+    int64_t remote = 0;
+    int64_t peer = 0;
+};
+
+FileCacheMetricSnapshot get_file_cache_metric_snapshot() {
+    FileCacheStatistics empty_stats;
+    FileCacheMetrics::instance().update(&empty_stats);
+    std::shared_ptr<AtomicStatistics> stats = 
FileCacheMetrics::instance().report();
+    auto local = stats->num_io_bytes_read_from_cache.load();
+    auto remote = stats->num_io_bytes_read_from_remote.load();
+    auto peer = stats->num_io_bytes_read_from_peer.load();
+    return {.total = local + remote + peer, .local = local, .remote = remote, 
.peer = peer};
+}
+
+void expect_file_cache_metric_delta(const FileCacheMetricSnapshot& before,
+                                    const FileCacheMetricSnapshot& after, 
int64_t local,
+                                    int64_t remote, int64_t peer) {
+    EXPECT_EQ(after.local - before.local, local);
+    EXPECT_EQ(after.remote - before.remote, remote);
+    EXPECT_EQ(after.peer - before.peer, peer);
+    EXPECT_EQ(after.total - before.total, local + remote + peer);
+}
+
+class FailAfterOffsetFileReader : public FileReader {
+public:
+    FailAfterOffsetFileReader(FileReaderSPtr reader, size_t fail_offset)
+            : _reader(std::move(reader)), _fail_offset(fail_offset) {}
+    ~FailAfterOffsetFileReader() override = default;
+
+    void set_fail(bool fail) { _fail = fail; }
+
+    Status close() override { return _reader->close(); }
+
+    const Path& path() const override { return _reader->path(); }
+
+    size_t size() const override { return _reader->size(); }
+
+    bool closed() const override { return _reader->closed(); }
+
+    int64_t mtime() const override { return _reader->mtime(); }
+
+protected:
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const IOContext* io_ctx) override {
+        if (_fail && offset >= _fail_offset) {
+            *bytes_read = 0;
+            return Status::IOError("inject remote read failure at offset {}", 
offset);
+        }
+        return _reader->read_at(offset, result, bytes_read, io_ctx);
+    }
+
+private:
+    FileReaderSPtr _reader;
+    size_t _fail_offset;
+    bool _fail = false;
+};
+
 void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr 
file_block,
                   const io::FileBlock::Range& expected_range, 
io::FileBlock::State expected_state) {
     auto range = file_block->range();
@@ -3798,6 +3907,136 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_error_handle) {
 
 extern bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found;
 
+TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_warmup_does_not_update_global_metrics) {
+    bool origin_enable_direct_read = config::enable_read_cache_file_directly;
+    config::enable_read_cache_file_directly = false;
+    Defer reset_direct_read {
+            [&] { config::enable_read_cache_file_directly = 
origin_enable_direct_read; }};
+
+    std::string cache_base_path = caches_dir / 
"cached_remote_reader_warmup_metrics" / "";
+    BlockFileCache* cache = nullptr;
+    ASSERT_TRUE(create_cached_remote_reader_cache(cache_base_path, 
&cache).ok());
+    Defer cleanup_cache {[&] { 
cleanup_cached_remote_reader_cache(cache_base_path); }};
+
+    FileReaderSPtr local_reader;
+    ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+    io::FileReaderOptions opts;
+    opts.cache_type = io::cache_type_from_string("file_block_cache");
+    opts.is_doris_table = true;
+    opts.tablet_id = 10086;
+    CachedRemoteFileReader reader(local_reader, opts);
+
+    std::string buffer(64_kb, '\0');
+    size_t bytes_read = 0;
+
+    auto before_normal_read = get_file_cache_metric_snapshot();
+    FileCacheStatistics normal_stats;
+    IOContext normal_ctx;
+    normal_ctx.file_cache_stats = &normal_stats;
+    ASSERT_TRUE(
+            reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &normal_ctx).ok());
+    EXPECT_EQ(bytes_read, 64_kb);
+    EXPECT_EQ(std::string(64_kb, '0'), buffer);
+    EXPECT_EQ(normal_stats.bytes_read_from_local, 0);
+    EXPECT_EQ(normal_stats.bytes_read_from_remote, 64_kb);
+    EXPECT_EQ(normal_stats.bytes_read_from_peer, 0);
+    EXPECT_EQ(normal_stats.num_local_io_total, 0);
+    EXPECT_EQ(normal_stats.num_remote_io_total, 1);
+    auto after_normal_read = get_file_cache_metric_snapshot();
+    expect_file_cache_metric_delta(before_normal_read, after_normal_read, 0, 
64_kb, 0);
+
+    auto before_warmup_miss = get_file_cache_metric_snapshot();
+    FileCacheStatistics warmup_miss_stats;
+    IOContext warmup_miss_ctx;
+    warmup_miss_ctx.file_cache_stats = &warmup_miss_stats;
+    warmup_miss_ctx.is_warmup = true;
+    buffer.assign(64_kb, '\0');
+    ASSERT_TRUE(
+            reader.read_at(2_mb, Slice(buffer.data(), buffer.size()), 
&bytes_read, &warmup_miss_ctx)
+                    .ok());
+    EXPECT_EQ(bytes_read, 64_kb);
+    EXPECT_EQ(std::string(64_kb, '2'), buffer);
+    EXPECT_EQ(warmup_miss_stats.bytes_read_from_local, 0);
+    EXPECT_EQ(warmup_miss_stats.bytes_read_from_remote, 64_kb);
+    EXPECT_EQ(warmup_miss_stats.bytes_read_from_peer, 0);
+    EXPECT_EQ(warmup_miss_stats.num_remote_io_total, 1);
+    auto after_warmup_miss = get_file_cache_metric_snapshot();
+    expect_file_cache_metric_delta(before_warmup_miss, after_warmup_miss, 0, 
0, 0);
+
+    auto before_warmup_hit = get_file_cache_metric_snapshot();
+    FileCacheStatistics warmup_hit_stats;
+    IOContext warmup_hit_ctx;
+    warmup_hit_ctx.file_cache_stats = &warmup_hit_stats;
+    warmup_hit_ctx.is_warmup = true;
+    buffer.assign(64_kb, '\0');
+    ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &warmup_hit_ctx)
+                        .ok());
+    EXPECT_EQ(bytes_read, 64_kb);
+    EXPECT_EQ(std::string(64_kb, '0'), buffer);
+    EXPECT_EQ(warmup_hit_stats.bytes_read_from_local, 64_kb);
+    EXPECT_EQ(warmup_hit_stats.bytes_read_from_remote, 0);
+    EXPECT_EQ(warmup_hit_stats.bytes_read_from_peer, 0);
+    EXPECT_EQ(warmup_hit_stats.num_local_io_total, 1);
+    auto after_warmup_hit = get_file_cache_metric_snapshot();
+    expect_file_cache_metric_delta(before_warmup_hit, after_warmup_hit, 0, 0, 
0);
+
+    EXPECT_TRUE(reader.close().ok());
+    EXPECT_TRUE(reader.closed());
+}
+
+TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_mixed_hit_miss_stats_are_split_by_bytes) {
+    bool origin_enable_direct_read = config::enable_read_cache_file_directly;
+    config::enable_read_cache_file_directly = false;
+    Defer reset_direct_read {
+            [&] { config::enable_read_cache_file_directly = 
origin_enable_direct_read; }};
+
+    std::string cache_base_path = caches_dir / 
"cached_remote_reader_mixed_hit_miss_stats" / "";
+    BlockFileCache* cache = nullptr;
+    ASSERT_TRUE(create_cached_remote_reader_cache(cache_base_path, 
&cache).ok());
+    Defer cleanup_cache {[&] { 
cleanup_cached_remote_reader_cache(cache_base_path); }};
+
+    FileReaderSPtr local_reader;
+    ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+    io::FileReaderOptions opts;
+    opts.cache_type = io::cache_type_from_string("file_block_cache");
+    opts.is_doris_table = true;
+    opts.tablet_id = 10086;
+    CachedRemoteFileReader reader(local_reader, opts);
+
+    std::string buffer(64_kb, '\0');
+    size_t bytes_read = 0;
+    FileCacheStatistics prime_stats;
+    IOContext prime_ctx;
+    prime_ctx.file_cache_stats = &prime_stats;
+    ASSERT_TRUE(
+            reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &prime_ctx).ok());
+    EXPECT_EQ(bytes_read, 64_kb);
+    EXPECT_EQ(prime_stats.bytes_read_from_remote, 64_kb);
+
+    auto before_mixed_read = get_file_cache_metric_snapshot();
+    FileCacheStatistics mixed_stats;
+    IOContext mixed_ctx;
+    mixed_ctx.file_cache_stats = &mixed_stats;
+    buffer.assign(64_kb, '\0');
+    ASSERT_TRUE(reader.read_at(1_mb - 32_kb, Slice(buffer.data(), 
buffer.size()), &bytes_read,
+                               &mixed_ctx)
+                        .ok());
+    EXPECT_EQ(bytes_read, 64_kb);
+    EXPECT_EQ(buffer.substr(0, 32_kb), std::string(32_kb, '0'));
+    EXPECT_EQ(buffer.substr(32_kb), std::string(32_kb, '1'));
+    EXPECT_EQ(mixed_stats.bytes_read_from_local, 32_kb);
+    EXPECT_EQ(mixed_stats.bytes_read_from_remote, 32_kb);
+    EXPECT_EQ(mixed_stats.bytes_read_from_peer, 0);
+    EXPECT_EQ(mixed_stats.num_local_io_total, 1);
+    EXPECT_EQ(mixed_stats.num_remote_io_total, 1);
+    EXPECT_EQ(mixed_stats.num_peer_io_total, 0);
+    auto after_mixed_read = get_file_cache_metric_snapshot();
+    expect_file_cache_metric_delta(before_mixed_read, after_mixed_read, 32_kb, 
32_kb, 0);
+
+    EXPECT_TRUE(reader.close().ok());
+    EXPECT_TRUE(reader.closed());
+}
+
 TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_self_heal_on_downloaded_not_found) {
     bool origin_enable_direct_read = config::enable_read_cache_file_directly;
     config::enable_read_cache_file_directly = false;
@@ -3823,6 +4062,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_self_heal_on_downloaded_not
     settings.max_query_cache_size = 0;
     
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, 
settings).ok());
     auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+    Defer cleanup_cache {[&] { 
cleanup_cached_remote_reader_cache(cache_base_path); }};
     for (int i = 0; i < 100; i++) {
         if (cache->get_async_open_success()) {
             break;
@@ -3832,11 +4072,12 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_self_heal_on_downloaded_not
 
     FileReaderSPtr local_reader;
     ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+    auto memory_reader = std::make_shared<InMemoryFileReader>(local_reader);
     io::FileReaderOptions opts;
     opts.cache_type = io::cache_type_from_string("file_block_cache");
     opts.is_doris_table = true;
     opts.tablet_id = 10086;
-    CachedRemoteFileReader reader(local_reader, opts);
+    CachedRemoteFileReader reader(memory_reader, opts);
 
     uint64_t before_self_heal = 
g_read_cache_self_heal_on_not_found.get_value();
 
@@ -3847,25 +4088,39 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_self_heal_on_downloaded_not
     size_t bytes_read = 0;
     ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
     EXPECT_EQ(std::string(64_kb, '0'), buffer);
+    EXPECT_EQ(stats.bytes_read_from_local, 0);
+    EXPECT_EQ(stats.bytes_read_from_remote, 64_kb);
+    EXPECT_EQ(stats.bytes_read_from_peer, 0);
 
     auto key = io::BlockFileCache::hash("tmp_file");
-    {
-        io::CacheContext inspect_ctx;
-        ReadStatistics inspect_stats;
-        inspect_ctx.stats = &inspect_stats;
-        inspect_ctx.cache_type = io::FileCacheType::NORMAL;
-        auto inspect_holder = cache->get_or_set(key, 0, 64_kb, inspect_ctx);
-        auto inspect_blocks = fromHolder(inspect_holder);
-        ASSERT_EQ(inspect_blocks.size(), 1);
-        ASSERT_EQ(inspect_blocks[0]->state(), 
io::FileBlock::State::DOWNLOADED);
-        std::string cache_file = inspect_blocks[0]->get_cache_file();
-        ASSERT_TRUE(fs::exists(cache_file));
-        ASSERT_TRUE(global_local_filesystem()->delete_file(cache_file).ok());
-        ASSERT_FALSE(fs::exists(cache_file));
-    }
 
-    ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+    auto* sp = SyncPoint::get_instance();
+    sp->enable_processing();
+    Defer defer {[&] {
+        sp->clear_call_back("LocalFileReader::read_at_impl");
+        sp->disable_processing();
+    }};
+    sp->set_call_back("LocalFileReader::read_at_impl", [&](auto&& values) {
+        std::pair<Status, bool>* pair = try_any_cast<std::pair<Status, 
bool>*>(values.back());
+        pair->first = Status::NotFound("inject cache file not found");
+        pair->second = true;
+    });
+
+    auto before_fallback_read = get_file_cache_metric_snapshot();
+    FileCacheStatistics fallback_stats;
+    IOContext fallback_ctx;
+    fallback_ctx.file_cache_stats = &fallback_stats;
+    ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &fallback_ctx)
+                        .ok());
     EXPECT_EQ(std::string(64_kb, '0'), buffer);
+    EXPECT_EQ(fallback_stats.bytes_read_from_local, 0);
+    EXPECT_EQ(fallback_stats.bytes_read_from_remote, 64_kb);
+    EXPECT_EQ(fallback_stats.bytes_read_from_peer, 0);
+    EXPECT_EQ(fallback_stats.num_local_io_total, 0);
+    EXPECT_EQ(fallback_stats.num_remote_io_total, 1);
+    EXPECT_EQ(fallback_stats.num_peer_io_total, 0);
+    auto after_fallback_read = get_file_cache_metric_snapshot();
+    expect_file_cache_metric_delta(before_fallback_read, after_fallback_read, 
0, 64_kb, 0);
 
     bool self_healed = false;
     for (int i = 0; i < 100; ++i) {
@@ -3886,13 +4141,6 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_self_heal_on_downloaded_not
 
     EXPECT_TRUE(reader.close().ok());
     EXPECT_TRUE(reader.closed());
-    std::this_thread::sleep_for(std::chrono::seconds(1));
-    if (fs::exists(cache_base_path)) {
-        fs::remove_all(cache_base_path);
-    }
-    FileCacheFactory::instance()->_caches.clear();
-    FileCacheFactory::instance()->_path_to_cache.clear();
-    FileCacheFactory::instance()->_capacity = 0;
 }
 
 TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_no_self_heal_on_non_not_found_error) {
@@ -3983,6 +4231,60 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_no_self_heal_on_non_not_fou
     FileCacheFactory::instance()->_capacity = 0;
 }
 
+TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_failed_read_does_not_update_metrics) {
+    bool origin_enable_direct_read = config::enable_read_cache_file_directly;
+    config::enable_read_cache_file_directly = true;
+    Defer reset_direct_read {
+            [&] { config::enable_read_cache_file_directly = 
origin_enable_direct_read; }};
+
+    std::string cache_base_path = caches_dir / 
"cached_remote_reader_failed_read_metrics" / "";
+    BlockFileCache* cache = nullptr;
+    ASSERT_TRUE(create_cached_remote_reader_cache(cache_base_path, 
&cache).ok());
+    Defer cleanup_cache {[&] { 
cleanup_cached_remote_reader_cache(cache_base_path); }};
+
+    FileReaderSPtr local_reader;
+    ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+    auto remote_reader = std::make_shared<InMemoryFileReader>(local_reader);
+    io::FileReaderOptions opts;
+    opts.cache_type = io::cache_type_from_string("file_block_cache");
+    opts.is_doris_table = true;
+    opts.tablet_id = 10086;
+    auto failing_remote_reader =
+            std::make_shared<FailAfterOffsetFileReader>(remote_reader, 
static_cast<size_t>(1_mb));
+    CachedRemoteFileReader reader(failing_remote_reader, opts);
+
+    std::string buffer(64_kb, '\0');
+    size_t bytes_read = 0;
+    FileCacheStatistics prime_stats;
+    IOContext prime_ctx;
+    prime_ctx.file_cache_stats = &prime_stats;
+    ASSERT_TRUE(
+            reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &prime_ctx).ok());
+    EXPECT_EQ(bytes_read, 64_kb);
+    EXPECT_EQ(prime_stats.bytes_read_from_remote, 64_kb);
+    EXPECT_EQ(cache->_cur_cache_size, 1_mb);
+
+    failing_remote_reader->set_fail(true);
+
+    auto before_failed_read = get_file_cache_metric_snapshot();
+    FileCacheStatistics failed_stats;
+    IOContext failed_ctx;
+    failed_ctx.file_cache_stats = &failed_stats;
+    buffer.assign(64_kb, '\0');
+    auto st = reader.read_at(1_mb - 100, Slice(buffer.data(), buffer.size()), 
&bytes_read,
+                             &failed_ctx);
+    ASSERT_FALSE(st.ok());
+    auto after_failed_read = get_file_cache_metric_snapshot();
+    expect_file_cache_metric_delta(before_failed_read, after_failed_read, 0, 
0, 0);
+    EXPECT_EQ(buffer.substr(0, 100), std::string(100, '0'));
+    EXPECT_EQ(failed_stats.bytes_read_from_local, 0);
+    EXPECT_EQ(failed_stats.bytes_read_from_remote, 0);
+    EXPECT_EQ(failed_stats.bytes_read_from_peer, 0);
+
+    EXPECT_TRUE(reader.close().ok());
+    EXPECT_TRUE(reader.closed());
+}
+
 TEST_F(BlockFileCacheTest, cached_remote_file_reader_init) {
     std::string cache_base_path = caches_dir / 
"cached_remote_file_reader_init" / "";
     if (fs::exists(cache_base_path)) {
@@ -8081,7 +8383,10 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_direct_read_bytes_check) {
     config::enable_evict_file_cache_in_advance = false;
     config::file_cache_enter_disk_resource_limit_mode_percent = 99;
 
+    bool origin_enable_direct_read = config::enable_read_cache_file_directly;
     config::enable_read_cache_file_directly = true;
+    Defer reset_direct_read {
+            [&] { config::enable_read_cache_file_directly = 
origin_enable_direct_read; }};
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -8148,10 +8453,22 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_direct_read_bytes_check) {
               64_kb);
 
     // try to read first two blocks
+    FileCacheStatistics partial_stats;
+    IOContext partial_io_ctx;
+    partial_io_ctx.file_cache_stats = &partial_stats;
+    auto before_partial_read = get_file_cache_metric_snapshot();
     ASSERT_TRUE(reader->read_at(1048576 - 100, Slice(buffer.data(), 
buffer.size()), &bytes_read,
-                                &io_ctx)
+                                &partial_io_ctx)
                         .ok());
     std::this_thread::sleep_for(std::chrono::seconds(1));
+    EXPECT_EQ(partial_stats.bytes_read_from_local, 100);
+    EXPECT_EQ(partial_stats.bytes_read_from_remote, 64_kb - 100);
+    EXPECT_EQ(partial_stats.bytes_read_from_peer, 0);
+    EXPECT_EQ(partial_stats.num_local_io_total, 1);
+    EXPECT_EQ(partial_stats.num_remote_io_total, 1);
+    EXPECT_EQ(partial_stats.num_peer_io_total, 0);
+    auto after_partial_read = get_file_cache_metric_snapshot();
+    expect_file_cache_metric_delta(before_partial_read, after_partial_read, 
100, 64_kb - 100, 0);
     EXPECT_EQ(cache->_cur_cache_size, 2097152);
     EXPECT_EQ(g_read_cache_direct_partial_num.get_value() - 
org_g_read_cache_direct_partial_num, 1);
     EXPECT_EQ(g_read_cache_direct_partial_bytes.get_value() - 
org_g_read_cache_direct_partial_bytes,
diff --git 
a/regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy
 
b/regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy
new file mode 100644
index 00000000000..e2fc6d6fe65
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/cache/warm_up/test_file_cache_warmup_read_metrics_docker.groovy
@@ -0,0 +1,457 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+import java.net.ServerSocket
+
+suite('test_file_cache_warmup_read_metrics_docker', 'docker') {
+    if (!isCloudMode()) {
+        return
+    }
+
+    def allocatePort = {
+        def socket = new ServerSocket(0)
+        def port = socket.localPort
+        socket.close()
+        return port
+    }
+    def minioPort = allocatePort()
+    def minioBucket = "test-bucket"
+    def minioContainer = 
"doris-file-cache-warmup-metrics-minio-${System.currentTimeMillis()}"
+
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+        'fetch_cluster_cache_hotspot_interval_ms=1000',
+        'heartbeat_interval_second=1',
+        'cloud_warm_up_for_rebalance_type=async_warmup',
+        'cloud_pre_heating_time_limit_sec=180',
+        'auto_check_statistics_in_minutes=60',
+    ]
+    options.beConfigs += [
+        'file_cache_enter_disk_resource_limit_mode_percent=99',
+        'enable_evict_file_cache_in_advance=false',
+        'file_cache_background_monitor_interval_ms=1000',
+        'report_tablet_interval_seconds=1',
+        'schedule_sync_tablets_interval_s=18000',
+        'disable_auto_compaction=true',
+        'disable_segment_cache=true',
+        'disable_storage_page_cache=true',
+        'enable_cache_read_from_peer=false',
+    ]
+    options.cloudStoreConfigs += [
+        'DORIS_CLOUD_USER=minio',
+        'DORIS_CLOUD_AK=minioadmin',
+        'DORIS_CLOUD_SK=minioadmin',
+        "DORIS_CLOUD_BUCKET=${minioBucket}",
+        "DORIS_CLOUD_ENDPOINT=host.docker.internal:${minioPort}",
+        "DORIS_CLOUD_EXTERNAL_ENDPOINT=host.docker.internal:${minioPort}",
+        'DORIS_CLOUD_REGION=us-east-1',
+        'DORIS_CLOUD_PROVIDER=S3',
+    ]
+    options.extraHosts += [
+        'host.docker.internal:host-gateway',
+        "${minioBucket}.host.docker.internal:host-gateway",
+    ]
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+
+    def computeCluster = "compute_cluster"
+    def sourceCluster = "metrics_warmup_source"
+    def targetCluster = "metrics_warmup_target"
+    def payload = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+
+    def waitForCondition = { condition, timeoutMs, message ->
+        long start = System.currentTimeMillis()
+        while (System.currentTimeMillis() - start < timeoutMs) {
+            if (condition()) {
+                return
+            }
+            sleep(1000)
+        }
+        if (!condition()) {
+            throw new RuntimeException(message)
+        }
+    }
+
+    def startMinio = {
+        cmd """docker rm -f ${minioContainer} >/dev/null 2>&1 || true"""
+        cmd """
+            docker run -d --name ${minioContainer} \
+                -p ${minioPort}:9000 \
+                -e MINIO_ROOT_USER=minioadmin \
+                -e MINIO_ROOT_PASSWORD=minioadmin \
+                -e MINIO_DOMAIN=host.docker.internal \
+                minio/minio:RELEASE.2024-11-07T00-52-20Z \
+                server /data --console-address ':9001'
+        """
+        waitForCondition({
+            try {
+                new 
URL("http://127.0.0.1:${minioPort}/minio/health/ready";).text
+                return true
+            } catch (Throwable ignored) {
+                return false
+            }
+        }, 60000, "MinIO did not become ready")
+        cmd """
+            docker exec ${minioContainer} sh -c \
+                'mc alias set local http://localhost:9000 minioadmin 
minioadmin && mc mb -p local/${minioBucket}'
+        """
+    }
+
+    def stopMinio = {
+        cmd """docker rm -f ${minioContainer} >/dev/null 2>&1 || true"""
+    }
+
+    def getClusterBackends = { clusterName ->
+        def backends = sql """SHOW BACKENDS"""
+        def clusterBackends = backends.findAll {
+            it[19].contains("""\"compute_group_name\" : \"${clusterName}\"""")
+        }
+        assertTrue(clusterBackends.size() > 0, "No backend found for cluster 
${clusterName}")
+        return clusterBackends
+    }
+
+    def getPromMetric = { ip, httpPort, name ->
+        def metrics = new URL("http://${ip}:${httpPort}/metrics";).text
+        def matcher = metrics =~ ~"(?m)^${name}\\s+([0-9]+(?:\\.[0-9]+)?)"
+        if (matcher.find()) {
+            return new BigDecimal(matcher[0][1]).longValue()
+        }
+        logger.info("${name} not found in /metrics for ${ip}:${httpPort}, 
treat it as 0")
+        return 0L
+    }
+
+    def getBrpcMetric = { ip, brpcPort, name ->
+        def metrics = new URL("http://${ip}:${brpcPort}/brpc_metrics";).text
+        def matcher = metrics =~ ~"(?m)^.*${name}\\s+([0-9]+)"
+        if (matcher.find()) {
+            return matcher[0][1] as long
+        }
+        logger.info("${name} not found in /brpc_metrics for ${ip}:${brpcPort}, 
treat it as 0")
+        return 0L
+    }
+
+    def getBackendReadMetrics = { be ->
+        def ip = be[1]
+        def httpPort = be[4]
+        return [
+            total : getPromMetric(ip, httpPort, 
"doris_be_num_io_bytes_read_total"),
+            local : getPromMetric(ip, httpPort, 
"doris_be_num_io_bytes_read_from_cache"),
+            remote: getPromMetric(ip, httpPort, 
"doris_be_num_io_bytes_read_from_remote"),
+            peer  : getPromMetric(ip, httpPort, 
"doris_be_num_io_bytes_read_from_peer"),
+        ]
+    }
+
+    def getClusterReadMetrics = { clusterName ->
+        def result = [total: 0L, local: 0L, remote: 0L, peer: 0L]
+        getClusterBackends(clusterName).each { be ->
+            def metrics = getBackendReadMetrics(be)
+            result.total += metrics.total
+            result.local += metrics.local
+            result.remote += metrics.remote
+            result.peer += metrics.peer
+        }
+        return result
+    }
+
+    def getMetricDelta = { before, after ->
+        return [
+            total : after.total - before.total,
+            local : after.local - before.local,
+            remote: after.remote - before.remote,
+            peer  : after.peer - before.peer,
+        ]
+    }
+
+    def assertMetricInvariant = { metrics ->
+        assertEquals(metrics.total, metrics.local + metrics.remote + 
metrics.peer)
+    }
+
+    def assertNoReadMetricDelta = { before, after, label ->
+        def delta = getMetricDelta(before, after)
+        logger.info("${label} read metric delta: ${delta}")
+        assertEquals(0L, delta.total)
+        assertEquals(0L, delta.local)
+        assertEquals(0L, delta.remote)
+        assertEquals(0L, delta.peer)
+    }
+
+    def clearFileCache = { ip, httpPort ->
+        def response = new 
URL("http://${ip}:${httpPort}/api/file_cache?op=clear&sync=true";).text
+        def json = new JsonSlurper().parseText(response)
+        if (json.status != "OK") {
+            throw new RuntimeException("Clear cache on ${ip}:${httpPort} 
failed: ${json.status}")
+        }
+    }
+
+    def clearFileCacheOnCluster = { clusterName ->
+        getClusterBackends(clusterName).each { be ->
+            clearFileCache(be[1], be[4])
+        }
+        sleep(5000)
+    }
+
+    def getBackendCacheSize = { be ->
+        return getBrpcMetric(be[1], be[5], "cache_cache_size")
+    }
+
+    def getClusterCacheSizeSum = { clusterName ->
+        long sum = 0
+        getClusterBackends(clusterName).each { be ->
+            sum += getBackendCacheSize(be)
+        }
+        return sum
+    }
+
+    def getClusterBrpcMetricSum = { clusterName, metricName ->
+        long sum = 0
+        getClusterBackends(clusterName).each { be ->
+            sum += getBrpcMetric(be[1], be[5], metricName)
+        }
+        return sum
+    }
+
+    def createTable = { table, buckets ->
+        sql """DROP TABLE IF EXISTS ${table}"""
+        sql """
+            CREATE TABLE ${table} (
+                k1 INT,
+                v1 VARCHAR(4096)
+            )
+            DUPLICATE KEY(k1)
+            DISTRIBUTED BY HASH(k1) BUCKETS ${buckets}
+            PROPERTIES (
+                "replication_num" = "1",
+                "file_cache_ttl_seconds" = "3600"
+            )
+        """
+    }
+
+    def loadRows = { table, rowStart, rowCount ->
+        int batchSize = 100
+        for (int begin = 0; begin < rowCount; begin += batchSize) {
+            int end = Math.min(rowCount, begin + batchSize)
+            def values = (begin..<end).collect { idx ->
+                int key = rowStart + idx
+                return "(${key}, '${payload}_${key}_${payload}')"
+            }.join(",")
+            sql """INSERT INTO ${table} VALUES ${values}"""
+        }
+        sql """sync"""
+    }
+
+    def queryTable = { table ->
+        def result = sql """SELECT SUM(LENGTH(v1)) FROM ${table} WHERE k1 >= 
0"""
+        assertTrue((result[0][0] as String).toLong() > 0)
+    }
+
+    def waitWarmupFinished = { jobId ->
+        waitForCondition({
+            def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+            if (jobInfo.size() == 0) {
+                return false
+            }
+            def state = jobInfo[0][3].toString()
+            if (state == "CANCELLED") {
+                throw new RuntimeException("Warm up job ${jobId} was 
cancelled")
+            }
+            return state == "FINISHED"
+        }, 120000, "Warm up job ${jobId} did not finish")
+    }
+
+    def cancelWarmupJob = { jobId ->
+        sql """CANCEL WARM UP JOB WHERE ID = ${jobId}"""
+        def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+        assertEquals("CANCELLED", jobInfo[0][3])
+    }
+
+    try {
+        startMinio()
+
+        docker(options) {
+            cluster.addBackend(1, sourceCluster)
+            cluster.addBackend(1, targetCluster)
+
+            sql """SET enable_file_cache = true"""
+            sql """SET enable_sql_cache = false"""
+            sql """SET enable_query_cache = false"""
+
+            sql """use @${computeCluster}"""
+            def manualTable = "test_file_cache_metrics_manual"
+            createTable(manualTable, 1)
+            loadRows(manualTable, 1, 300)
+            clearFileCacheOnCluster(computeCluster)
+
+            def beforeQueryMiss = getClusterReadMetrics(computeCluster)
+            queryTable(manualTable)
+            waitForCondition({
+                def delta = getMetricDelta(beforeQueryMiss, 
getClusterReadMetrics(computeCluster))
+                return delta.total > 0 && delta.remote > 0
+            }, 30000, "Normal query miss did not increase global remote read 
metrics")
+            def queryMissDelta = getMetricDelta(beforeQueryMiss, 
getClusterReadMetrics(computeCluster))
+            logger.info("query miss read metric delta: ${queryMissDelta}")
+            assertMetricInvariant(queryMissDelta)
+            assertTrue(queryMissDelta.remote > 0)
+
+            clearFileCacheOnCluster(computeCluster)
+            def beforeManualWarmup = getClusterReadMetrics(computeCluster)
+            def manualJob = sql """WARM UP CLUSTER ${computeCluster} WITH 
TABLE ${manualTable}"""
+            waitWarmupFinished(manualJob[0][0])
+            waitForCondition({
+                return getClusterCacheSizeSum(computeCluster) > 0
+            }, 30000, "Manual table warm up did not populate file cache")
+            def afterManualWarmup = getClusterReadMetrics(computeCluster)
+            assertNoReadMetricDelta(beforeManualWarmup, afterManualWarmup, 
"manual table warm up")
+
+            def beforeQueryHit = getClusterReadMetrics(computeCluster)
+            queryTable(manualTable)
+            waitForCondition({
+                def delta = getMetricDelta(beforeQueryHit, 
getClusterReadMetrics(computeCluster))
+                return delta.total > 0 && delta.local > 0
+            }, 30000, "Query after warm up did not increase global local read 
metrics")
+            def queryHitDelta = getMetricDelta(beforeQueryHit, 
getClusterReadMetrics(computeCluster))
+            logger.info("query hit read metric delta: ${queryHitDelta}")
+            assertMetricInvariant(queryHitDelta)
+            assertTrue(queryHitDelta.local > 0)
+
+            sql """use @${sourceCluster}"""
+            sql """TRUNCATE TABLE __internal_schema.cloud_cache_hotspot"""
+            def periodicTable = "test_file_cache_metrics_periodic"
+            createTable(periodicTable, 1)
+            loadRows(periodicTable, 1000, 300)
+            clearFileCacheOnCluster(sourceCluster)
+            clearFileCacheOnCluster(targetCluster)
+
+            def beforePeriodicWarmup = getClusterReadMetrics(targetCluster)
+            def periodicJob = sql """
+                WARM UP CLUSTER ${targetCluster} WITH CLUSTER ${sourceCluster}
+                PROPERTIES (
+                    "sync_mode" = "periodic",
+                    "sync_interval_sec" = "1"
+                )
+            """
+            for (int i = 0; i < 120; i++) {
+                queryTable(periodicTable)
+            }
+            waitForCondition({
+                return getClusterCacheSizeSum(sourceCluster) > 0 &&
+                        getClusterCacheSizeSum(targetCluster) > 0
+            }, 90000, "Periodic warm up did not populate target file cache")
+            def afterPeriodicWarmup = getClusterReadMetrics(targetCluster)
+            assertNoReadMetricDelta(beforePeriodicWarmup, afterPeriodicWarmup,
+                    "periodic cluster warm up")
+            cancelWarmupJob(periodicJob[0][0])
+
+            def eventTable = "test_file_cache_metrics_event"
+            createTable(eventTable, 1)
+            clearFileCacheOnCluster(sourceCluster)
+            clearFileCacheOnCluster(targetCluster)
+            def beforeEventWarmup = getClusterReadMetrics(targetCluster)
+            def eventJob = sql """
+                WARM UP CLUSTER ${targetCluster} WITH CLUSTER ${sourceCluster}
+                PROPERTIES (
+                    "sync_mode" = "event_driven",
+                    "sync_event" = "load"
+                )
+            """
+            waitForCondition({
+                def jobInfo = sql """SHOW WARM UP JOB WHERE ID = 
${eventJob[0][0]}"""
+                return jobInfo.size() > 0 && jobInfo[0][3].toString() in 
["RUNNING", "PENDING"]
+            }, 30000, "Event-driven warm up job did not enter running state")
+            sleep(15000)
+            def beforeEventSubmitted = getClusterBrpcMetricSum(targetCluster,
+                    "file_cache_event_driven_warm_up_submitted_segment_num")
+            def beforeEventFinished = getClusterBrpcMetricSum(targetCluster,
+                    "file_cache_event_driven_warm_up_finished_segment_num")
+            def beforeEventFailed = getClusterBrpcMetricSum(targetCluster,
+                    "file_cache_event_driven_warm_up_failed_segment_num")
+            loadRows(eventTable, 2000, 300)
+            waitForCondition({
+                def submitted = getClusterBrpcMetricSum(targetCluster,
+                        
"file_cache_event_driven_warm_up_submitted_segment_num")
+                def finished = getClusterBrpcMetricSum(targetCluster,
+                        "file_cache_event_driven_warm_up_finished_segment_num")
+                return submitted > beforeEventSubmitted && finished >= 
submitted
+            }, 90000, "Event-driven warm up did not finish target segment 
downloads")
+            def afterEventSubmitted = getClusterBrpcMetricSum(targetCluster,
+                    "file_cache_event_driven_warm_up_submitted_segment_num")
+            def afterEventFinished = getClusterBrpcMetricSum(targetCluster,
+                    "file_cache_event_driven_warm_up_finished_segment_num")
+            def afterEventFailed = getClusterBrpcMetricSum(targetCluster,
+                    "file_cache_event_driven_warm_up_failed_segment_num")
+            logger.info("event-driven warm up segment metrics: submitted 
${beforeEventSubmitted}"
+                    + " -> ${afterEventSubmitted}, finished 
${beforeEventFinished}"
+                    + " -> ${afterEventFinished}, failed ${beforeEventFailed}"
+                    + " -> ${afterEventFailed}")
+            assertEquals(beforeEventFailed, afterEventFailed)
+            def afterEventWarmup = getClusterReadMetrics(targetCluster)
+            assertNoReadMetricDelta(beforeEventWarmup, afterEventWarmup,
+                    "event-driven cluster warm up")
+            cancelWarmupJob(eventJob[0][0])
+
+            sql """use @${computeCluster}"""
+            def rebalanceTable = "test_file_cache_metrics_rebalance"
+            createTable(rebalanceTable, 8)
+            loadRows(rebalanceTable, 3000, 500)
+            clearFileCacheOnCluster(computeCluster)
+            def beforeRebalanceSourceQuery = 
getClusterReadMetrics(computeCluster)
+            queryTable(rebalanceTable)
+            waitForCondition({
+                def delta = getMetricDelta(beforeRebalanceSourceQuery,
+                        getClusterReadMetrics(computeCluster))
+                return delta.total > 0 && delta.remote > 0
+            }, 30000, "Rebalance source query did not populate source file 
cache")
+
+            def beforeBackendIds = getClusterBackends(computeCluster).collect 
{ it[0].toString() } as Set
+            def beforeRebalanceWarmup = getClusterReadMetrics(computeCluster)
+            cluster.addBackend(1, computeCluster)
+            def newBackendHolder = [:]
+            waitForCondition({
+                def added = getClusterBackends(computeCluster).find {
+                    !beforeBackendIds.contains(it[0].toString())
+                }
+                if (added != null) {
+                    newBackendHolder.be = added
+                    return true
+                }
+                return false
+            }, 30000, "New backend was not added to ${computeCluster}")
+
+            waitForCondition({
+                def distribution = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM ${rebalanceTable}"""
+                return distribution.any {
+                    it.BackendId.toString() == 
newBackendHolder.be[0].toString() &&
+                            Integer.valueOf(it.ReplicaNum.toString()) > 0
+                }
+            }, 180000, "Rebalance did not move any replica to the new backend")
+
+            waitForCondition({
+                return getBackendCacheSize(newBackendHolder.be) > 0
+            }, 180000, "Rebalance warm up did not populate file cache on the 
new backend")
+
+            def afterRebalanceWarmup = getClusterReadMetrics(computeCluster)
+            assertNoReadMetricDelta(beforeRebalanceWarmup, 
afterRebalanceWarmup,
+                    "rebalance warm up")
+        }
+    } finally {
+        stopMinio()
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to