github-actions[bot] commented on code in PR #63818:
URL: https://github.com/apache/doris/pull/63818#discussion_r3386153959


##########
be/src/cloud/cloud_cluster_info.h:
##########
@@ -85,12 +85,15 @@ class CloudClusterInfo : public ClusterInfo {
     // Check if this cluster should skip compaction for the given tablet
     // Returns true if should skip (i.e., another cluster should do the 
compaction)
     bool should_skip_compaction(CloudTablet* tablet) const;
+    const std::string& cloud_compute_group_id() const { return 
_cloud_compute_group_id; }

Review Comment:
   This getter/setter pair needs the same synchronization as `_my_cluster_id` 
(and the getter should return a copied string under the lock). The heartbeat 
thread writes `_cloud_compute_group_id` in `HeartbeatServer::_heartbeat()`, 
while scan/cache threads read it in `collect_race_result()` to classify peer 
wins. Returning a reference to a `std::string` that can be assigned 
concurrently is a C++ data race on the string storage.



##########
be/src/exec/scan/parallel_scanner_builder.cpp:
##########
@@ -180,34 +198,83 @@ Status 
ParallelScannerBuilder::_build_scanners_by_per_segment(std::list<ScannerS
             
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
         }
 
-        // For each RowSet split in the read source, split by segment id and 
build
-        // one scanner per segment. Keep delete predicates shared.
+        // Collect segments into scanners based on rows count instead of one 
scanner per segment
+        TabletReadSource partitial_read_source;
+        int64_t rows_collected = 0;
+
         for (auto& rs_split : entire_read_source.rs_splits) {
             auto reader = rs_split.rs_reader;
             auto rowset = reader->rowset();
             const auto rowset_id = rowset->rowset_id();
+
             const auto& segments_rows = _all_segments_rows[rowset_id];
             if (segments_rows.empty() || rowset->num_rows() == 0) {
                 continue;
             }
 
-            // Build scanners for [i, i+1) segment range, without row-range 
slicing.
-            for (int64_t i = 0; i < rowset->num_segments(); ++i) {
-                RowSetSplits split(reader->clone());
-                split.segment_offsets.first = i;
-                split.segment_offsets.second = i + 1;
-                // No row-ranges slicing; scan whole segment i.
-                DCHECK_GE(split.segment_offsets.second, 
split.segment_offsets.first + 1);
+            int64_t segment_start = 0;
+            auto split = RowSetSplits(reader->clone());
+
+            for (size_t i = 0; i < segments_rows.size(); ++i) {
+                const size_t rows_of_segment = segments_rows[i];
+
+                // Check if adding this segment would exceed rows_per_scanner
+                // 0.9: try to avoid splitting the segments into excessively 
small parts.
+                if (rows_collected > 0 && (rows_collected + rows_of_segment > 
_rows_per_scanner &&
+                                           rows_collected < _rows_per_scanner 
* 9 / 10)) {
+                    // Create a new scanner with collected segments
+                    split.segment_offsets.first = segment_start;
+                    split.segment_offsets.second =
+                            i; // Range is [segment_start, i), including all 
segments from segment_start to i-1
+

Review Comment:
   This can enqueue an empty split when the flush happens at the first segment 
of a new rowset. `rows_collected` intentionally carries across rowsets, so if 
the previous rowset has some rows and `i == 0` in the next rowset makes 
`rows_collected + rows_of_segment > _rows_per_scanner`, `segment_offsets` 
becomes `[0, 0)`: the debug build hits `DCHECK_GT`, and release builds pass an 
empty `RowSetSplits` to the scanner. Flush the already collected 
`partitial_read_source` before adding a split for the new rowset, or only 
emplace this rowset split when `i > segment_start`.



##########
be/src/io/cache/cached_remote_file_reader.cpp:
##########
@@ -158,247 +193,530 @@ std::pair<size_t, size_t> 
CachedRemoteFileReader::s_align_size(size_t offset, si
 }
 
 namespace {
-// Execute S3 read
-Status execute_s3_read(size_t empty_start, size_t& size, 
std::unique_ptr<char[]>& buffer,
-                       ReadStatistics& stats, const IOContext* io_ctx,
-                       FileReaderSPtr remote_file_reader) {
-    s3_read_counter << 1;
-    SCOPED_RAW_TIMER(&stats.remote_read_timer);
-    stats.from_peer_cache = false;
-    return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), 
&size, io_ctx);
+struct PeerFetchLayout {
+    std::vector<size_t> block_offsets;
+    std::vector<size_t> block_sizes;
+    size_t total_size = 0;
+};
+
+bool contains_file_block(const PeerFetchedBlockSet& fetched_blocks, const 
FileBlockSPtr& block) {
+    return fetched_blocks.contains(block.get());
 }
 
-// Get peer connection info from tablet_id
-std::pair<std::string, int> get_peer_connection_info(int64_t tablet_id,
-                                                     const std::string& 
file_path) {
-    std::string host = "";
-    int port = 0;
+size_t clip_peer_block_size(const FileBlock::Range& range, size_t file_size) {
+    if (range.left >= file_size) {
+        return 0;
+    }
+    return std::min(file_size - range.left, range.size());
+}
 
-    DCHECK(tablet_id > 0);
-    auto& manager = 
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
-    if (auto tablet_info = manager.get_balanced_tablet_info(tablet_id)) {
-        host = tablet_info->first;
-        port = tablet_info->second;
-    } else {
-        LOG_EVERY_N(WARNING, 100) << "get peer connection info not found"
-                                  << ", tablet_id=" << tablet_id << ", 
file_path=" << file_path;
+PeerFetchLayout build_peer_fetch_layout(const std::vector<FileBlockSPtr>& 
blocks,
+                                        size_t file_size) {
+    PeerFetchLayout layout;
+    layout.block_offsets.reserve(blocks.size());
+    layout.block_sizes.reserve(blocks.size());
+    for (const auto& block : blocks) {
+        const size_t block_size = clip_peer_block_size(block->range(), 
file_size);
+        layout.block_offsets.push_back(layout.total_size);
+        layout.block_sizes.push_back(block_size);
+        layout.total_size += block_size;
     }
+    return layout;
+}
 
-    DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
-        host = dp->param<std::string>("host", "127.0.0.1");
-        port = dp->param("port", 9060);
-        LOG_WARNING("debug point 
PeerFileCacheReader::_fetch_from_peer_cache_blocks")
-                .tag("host", host)
-                .tag("port", port);
-    });
+Status write_peer_payloads_into_block(const FileBlockSPtr& block,
+                                      std::vector<const PeerFetchChunk*>& 
chunks,
+                                      size_t* block_size) {
+    if (block_size == nullptr) {
+        return Status::InvalidArgument("peer block write requires non-null 
block_size");
+    }
+    *block_size = 0;
+    if (chunks.empty()) {
+        return Status::OK();
+    }
+    std::sort(chunks.begin(), chunks.end(),
+              [](const PeerFetchChunk* lhs, const PeerFetchChunk* rhs) {
+                  return lhs->block_offset < rhs->block_offset;
+              });
+    butil::IOBuf payload;
+    for (const auto* chunk : chunks) {
+        *block_size += chunk->payload.length();
+        payload.append(chunk->payload);
+    }
+    DCHECK(*block_size != 0);
+    return block->append_iobuf(payload);
+}
 
-    return {host, port};
+void copy_peer_chunk_to_result(const PeerFetchChunk& chunk, size_t offset, 
size_t right_offset,
+                               size_t already_read, Slice result, size_t& 
indirect_read_bytes,
+                               SourceReadBreakdown& source_read_breakdown) {
+    const size_t payload_size = chunk.payload.length();
+    if (payload_size == 0) {
+        return;
+    }
+    const size_t chunk_left = chunk.block_offset;
+    const size_t chunk_right = chunk_left + payload_size - 1;
+    const size_t copy_left_offset = std::max(offset + already_read, 
chunk_left);
+    const size_t copy_right_offset = std::min(right_offset, chunk_right);
+    if (copy_left_offset > copy_right_offset) {
+        return;
+    }
+    const size_t copy_offset = copy_left_offset - chunk_left;
+    const size_t copy_size = copy_right_offset - copy_left_offset + 1;
+    char* dst = result.data + (copy_left_offset - offset);
+    chunk.payload.copy_to(dst, copy_size, copy_offset);
+    indirect_read_bytes += copy_size;
+    source_read_breakdown.peer_bytes += copy_size;
 }
 
-// Execute peer read with fallback to S3
-// file_size is the size of the file
-// used to calculate the rightmost boundary value of the block, due to 
inaccurate current block meta information.
-Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, 
size_t empty_start,
-                         size_t& size, std::unique_ptr<char[]>& buffer,
-                         const std::string& file_path, size_t file_size, bool 
is_doris_table,
-                         int64_t tablet_id, ReadStatistics& stats, const 
IOContext* io_ctx) {
-    auto [host, port] = get_peer_connection_info(tablet_id, file_path);
+// Execute peer read targeting a specific host:port.
+Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks,
+                         PeerFetchResult* peer_result, const std::string& 
file_path,
+                         size_t file_size, bool is_doris_table, 
ReadStatistics& stats,
+                         const IOContext* io_ctx, const std::string& host, 
int32_t port) {
     VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", 
port=" << port
                << ", file_path=" << file_path;
 
     if (host.empty() || port == 0) {
         g_failed_get_peer_addr_counter << 1;
-        VLOG_DEBUG << "PeerFileCacheReader host or port is empty"
-                   << ", host=" << host << ", port=" << port << ", file_path=" 
<< file_path;
+        LOG_EVERY_N(WARNING, 100) << "PeerFileCacheReader host or port is 
empty"
+                                  << ", host=" << host << ", port=" << port
+                                  << ", file_path=" << file_path;
         return Status::InternalError<false>("host or port is empty");
     }
     SCOPED_RAW_TIMER(&stats.peer_read_timer);
     peer_read_counter << 1;
     PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port);
-    auto st = peer_reader.fetch_blocks(empty_blocks, empty_start, 
Slice(buffer.get(), size), &size,
-                                       file_size, io_ctx);
+    // Serial peer read: source BE has the data from rebalance warm-up; no 
fill needed.
+    auto st = peer_reader.fetch_blocks(empty_blocks, peer_result, file_size, 
io_ctx,
+                                       /*request_fill=*/false);
     if (!st.ok()) {
-        VLOG_DEBUG << "PeerFileCacheReader read from peer failed"
-                   << ", host=" << host << ", port=" << port << ", error=" << 
st.msg();
+        LOG_WARNING("PeerFileCacheReader read from peer failed")
+                .tag("host", host)
+                .tag("port", port)
+                .tag("error", st.msg());
     }
-    stats.from_peer_cache = true;
+    stats.from_peer_cache = st.ok();
     return st;
 }
 
-} // anonymous namespace
+// Execute S3 read
+Status execute_s3_read(size_t empty_start, size_t& size, 
std::unique_ptr<char[]>& buffer,
+                       ReadStatistics& stats, const IOContext* io_ctx,
+                       FileReaderSPtr remote_file_reader) {
+    s3_read_counter << 1;
+    SCOPED_RAW_TIMER(&stats.remote_read_timer);
+    stats.from_peer_cache = false;
+    return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), 
&size, io_ctx);
+}
 
-Status CachedRemoteFileReader::_execute_remote_read(const 
std::vector<FileBlockSPtr>& empty_blocks,
-                                                    size_t empty_start, 
size_t& size,
-                                                    std::unique_ptr<char[]>& 
buffer,
-                                                    ReadStatistics& stats,
-                                                    const IOContext* io_ctx) {
-    DBUG_EXECUTE_IF("CachedRemoteFileReader.read_at_impl.change_type", {
-        // Determine read type from debug point or default to S3
-        std::string read_type = "s3";
-        read_type = dp->param<std::string>("type", "s3");
-        LOG_WARNING("CachedRemoteFileReader.read_at_impl.change_type")
-                .tag("path", path().native())
-                .tag("off", empty_start)
-                .tag("size", size)
-                .tag("type", read_type);
-        // Execute appropriate read strategy
-        if (read_type == "s3") {
-            return execute_s3_read(empty_start, size, buffer, stats, io_ctx, 
_remote_file_reader);
-        } else {
-            return execute_peer_read(empty_blocks, empty_start, size, buffer, 
path().native(),
-                                     this->size(), _is_doris_table, 
_tablet_id, stats, io_ctx);
+CloudWarmUpManager& get_warm_up_manager() {
+    return 
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+}
+
+// Shared state for peer-vs-S3 winner race.
+// Uses bthread primitives — never std::mutex/condition_variable in bthread 
context.
+struct RaceState {
+    bthread::Mutex mtx;
+    bthread::ConditionVariable cv;
+    int winner = -1; // 0=peer won, 1=s3 won, -1=undecided, -2=both failed
+    bool peer_done = false;
+    bool s3_done = false;
+    Status peer_status;
+    Status s3_status;
+    std::unique_ptr<char[]> s3_buf;
+    PeerFetchResult peer_res;
+    std::string peer_winner_cg_id; // compute_group_id of the winning peer 
candidate
+};
+
+// Peer race logic: try candidates sequentially until one succeeds or all fail.
+void run_peer_race(std::shared_ptr<RaceState> race, std::vector<FileBlockSPtr> 
empty_blocks,
+                   const std::string& file_path, size_t file_sz, bool is_doris,
+                   const IOContext* io_ctx, std::vector<doris::PeerCandidate> 
candidates,
+                   int64_t tablet_id, std::shared_ptr<ResourceContext> 
parent_resource_ctx) {
+    std::unique_ptr<AttachTask> attach_task;
+    if (parent_resource_ctx != nullptr) {
+        attach_task = std::make_unique<AttachTask>(parent_resource_ctx);
+    }
+
+    auto& manager = get_warm_up_manager();
+    bool all_tried = true;
+
+    for (size_t i = 0; i < candidates.size(); ++i) {
+        // Before issuing the next RPC, check if S3 already won.
+        if (i > 0) {
+            TEST_SYNC_POINT("run_peer_race::between_candidates");
+            std::unique_lock<bthread::Mutex> lk(race->mtx);
+            if (race->winner > 0) {
+                // S3 already won — stop, but not all candidates were tried.
+                all_tried = false;
+                break;
+            }
         }
-    });
 
-    if (!doris::config::is_cloud_mode() || !_is_doris_table || 
io_ctx->is_warmup ||
-        !doris::config::enable_cache_read_from_peer) {
-        return execute_s3_read(empty_start, size, buffer, stats, io_ctx, 
_remote_file_reader);
-    } else {
-        // first try peer read, if peer failed, fallback to S3
-        // peer timeout is 5 seconds
-        // TODO(dx): here peer and s3 reader need to get data in parallel, and 
take the one that is correct and returns first
-        // ATTN: Save original size before peer read, as it may be modified by 
fetch_blocks, read peer ref size
-        size_t original_size = size;
-        auto st = execute_peer_read(empty_blocks, empty_start, size, buffer, 
path().native(),
-                                    this->size(), _is_doris_table, _tablet_id, 
stats, io_ctx);
-        if (!st.ok()) {
-            // Restore original size for S3 fallback, as peer read may have 
modified it
-            size = original_size;
-            // Fallback to S3
-            return execute_s3_read(empty_start, size, buffer, stats, io_ctx, 
_remote_file_reader);
+        const auto& cand = candidates[i];
+        peer_read_counter << 1;
+        PeerFileCacheReader peer_reader(file_path, is_doris, cand.host, 
cand.brpc_port);
+        PeerFetchResult local_peer_res;
+        const bool request_fill = 
!config::peer_cache_fill_compute_group_id.empty() &&
+                                  cand.compute_group_id == 
config::peer_cache_fill_compute_group_id;
+        auto st = peer_reader.fetch_blocks(empty_blocks, &local_peer_res, 
file_sz, io_ctx,
+                                           request_fill, tablet_id);
+        if (st.ok()) {
+            manager.update_peer_candidate_on_success(tablet_id, 
cand.compute_group_id);
+            std::unique_lock<bthread::Mutex> lk(race->mtx);
+            if (race->winner < 0) {
+                race->winner = 0;
+                race->peer_res = std::move(local_peer_res);
+                race->peer_winner_cg_id = cand.compute_group_id;
+            }
+            race->peer_done = true;
+            race->peer_status = Status::OK();
+            race->cv.notify_all();
+            return;
+        }
+
+        // Handle per-candidate failure.
+        if (st.template is<ErrorCode::TOO_MANY_TASKS>()) {
+            all_tried = false;
+            break;
+        }
+        if (st.template is<ErrorCode::NOT_FOUND>()) {
+            manager.rotate_peer_candidate_on_cache_miss(tablet_id, cand.host, 
cand.brpc_port);
+        } else {
+            manager.update_peer_candidate_on_rpc_failure(tablet_id, cand.host, 
cand.brpc_port);
         }
-        return st;
     }
+
+    if (all_tried) {
+        manager.record_peer_all_miss(tablet_id);
+    }
+    std::unique_lock<bthread::Mutex> lk(race->mtx);
+    race->peer_done = true;
+    race->peer_status = Status::InternalError<false>("peer: all candidates 
failed");
+    if (race->winner < 0 && race->s3_done) {
+        race->winner = race->s3_status.ok() ? 1 : -2;
+    }
+    race->cv.notify_all();
 }
 
-Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, 
size_t* bytes_read,
-                                            const IOContext* io_ctx) {
-    size_t already_read = 0;
-    
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at);
+// Apply hedge delay, then submit S3 read to the thread pool (or run inline).
+void launch_s3_race(std::shared_ptr<RaceState> race, size_t empty_start, 
size_t span_size,
+                    const IOContext* io_ctx, FileReaderSPtr remote_reader,
+                    std::shared_ptr<ResourceContext> parent_resource_ctx) {
+    // Raw S3 read body.
+    auto do_s3_read = [race, empty_start, span_size, io_ctx, remote_reader]() {
+        auto s3_buf = std::make_unique<char[]>(span_size);
+        size_t read_size = span_size;
+        s3_read_counter << 1;
+        
TEST_SYNC_POINT("CachedRemoteFileReader::_execute_winner_race::s3_before_read");
+        auto st = remote_reader->read_at(empty_start, Slice(s3_buf.get(), 
span_size), &read_size,
+                                         io_ctx);
+        std::unique_lock<bthread::Mutex> lk(race->mtx);
+        race->s3_done = true;
+        race->s3_status = st;
+        if (st.ok() && race->winner < 0) {
+            race->winner = 1;
+            race->s3_buf = std::move(s3_buf);
+        }
+        race->cv.notify_all();
+    };
 
-    const IOContext default_io_ctx;
-    if (io_ctx == nullptr) {
-        io_ctx = &default_io_ctx;
+    // Hedge delay: give peer a head start.
+    bool peer_already_won = false;
+    if (config::peer_race_hedge_delay_ms > 0) {
+        bthread_usleep(static_cast<int64_t>(config::peer_race_hedge_delay_ms) 
* 1000);
+        std::unique_lock<bthread::Mutex> lk(race->mtx);
+        peer_already_won = (race->winner == 0);
+        if (peer_already_won) {
+            race->s3_done = true;
+            race->s3_status = Status::InternalError<false>("skipped: peer won 
during hedge delay");
+            race->cv.notify_all();
+        }
     }
-    DCHECK(io_ctx);
-    const bool is_dryrun = io_ctx->is_dryrun;
-    DCHECK(!closed());
-    if (offset > size()) {
-        return Status::InvalidArgument(
-                fmt::format("offset exceeds file size(offset: {}, file size: 
{}, path: {})", offset,
-                            size(), path().native()));
+
+    if (!peer_already_won) {
+        auto s3_fn = [do_s3_read, parent_resource_ctx]() mutable {
+            std::unique_ptr<AttachTask> attach_task;
+            if (parent_resource_ctx != nullptr) {
+                attach_task = 
std::make_unique<AttachTask>(parent_resource_ctx);
+            }
+            do_s3_read();
+        };
+        auto* s3_pool = ExecEnv::GetInstance()->peer_race_s3_thread_pool();
+        if (s3_pool == nullptr || !s3_pool->submit_func(s3_fn).ok()) {
+            do_s3_read();
+        }
     }
-    size_t bytes_req = result.size;
-    bytes_req = std::min(bytes_req, size() - offset);
-    if (UNLIKELY(bytes_req == 0)) {
-        *bytes_read = 0;
-        return Status::OK();
+}
+
+// Wait for the race to finish and populate the output accordingly.
+Status collect_race_result(std::shared_ptr<RaceState> race, size_t span_size,
+                           std::unique_ptr<char[]>& buffer, PeerFetchResult* 
peer_result,
+                           ReadStatistics& stats, const IOContext* io_ctx) {
+    {
+        std::unique_lock<bthread::Mutex> lk(race->mtx);
+        while (race->winner < 0 && !(race->peer_done && race->s3_done)) {
+            race->cv.wait(lk);
+        }
+        // When S3 wins, the peer bthread may still be finishing its current 
candidate or
+        // recording cooldown state. Wait for that cleanup before returning so 
follow-up reads
+        // observe the completed bookkeeping and test hooks / mock services 
are not torn down
+        // while the peer path is still executing.
+        while (race->winner == 1 && !race->peer_done) {
+            race->cv.wait(lk);
+        }
     }
+    g_active_peer_races.fetch_sub(1, std::memory_order_relaxed);
 
-    ReadStatistics stats;
-    stats.bytes_read += bytes_req;
-    bool read_success = false;
-    MonotonicStopWatch read_at_sw;
-    read_at_sw.start();
-    auto defer_func = [&](int*) {
-        if (config::print_stack_when_cache_miss) {
-            if (io_ctx->file_cache_stats == nullptr && !stats.hit_cache && 
!io_ctx->is_warmup) {
-                LOG_INFO("[verbose] {}", Status::InternalError<true>("not hit 
cache"));
-            }
+    const std::string self_cg_id =
+            
static_cast<CloudClusterInfo*>(ExecEnv::GetInstance()->cluster_info())
+                    ->cloud_compute_group_id();
+
+    if (race->winner == 0) {
+        // Peer won.
+        if (peer_result != nullptr) {
+            *peer_result = std::move(race->peer_res);
         }
-        if (!stats.hit_cache && config::read_cluster_cache_opt_verbose_log) {
-            LOG_INFO(
-                    "[verbose] not hit cache, path: {}, offset: {}, size: {}, 
cost: {} ms, warmup: "
-                    "{}",
-                    path().native(), offset, bytes_req, 
read_at_sw.elapsed_time_milliseconds(),
-                    io_ctx->is_warmup);
+        stats.from_peer_cache = true;
+        g_peer_race_peer_win << 1;
+        const bool is_cross_cg =
+                !race->peer_winner_cg_id.empty() && race->peer_winner_cg_id != 
self_cg_id;
+        if (is_cross_cg) {
+            g_peer_cross_compute_group_read << 1;
+        } else {
+            g_peer_same_compute_group_read << 1;
         }
-        if (is_dryrun) {
-            return;
+        if (io_ctx != nullptr && io_ctx->file_cache_stats != nullptr) {
+            io_ctx->file_cache_stats->num_peer_race_peer_win++;
+            if (is_cross_cg) {
+                io_ctx->file_cache_stats->num_cross_cg_peer_io_total++;
+                io_ctx->file_cache_stats->bytes_read_from_cross_cg_peer += 
span_size;
+            }
         }
-        if (!read_success) {
-            return;
+        return Status::OK();
+    } else if (race->winner == 1) {
+        // S3 won.
+        buffer = std::move(race->s3_buf);
+        stats.from_peer_cache = false;
+        g_peer_race_s3_win << 1;
+        if (io_ctx != nullptr && io_ctx->file_cache_stats != nullptr) {
+            io_ctx->file_cache_stats->num_peer_race_s3_win++;
         }
-        if (!io_ctx->is_warmup) {
-            // update stats increment in this reading procedure for file cache 
metrics
-            FileCacheStatistics fcache_stats_increment;
-            _update_stats(stats, &fcache_stats_increment, 
io_ctx->is_inverted_index);
-            io::FileCacheMetrics::instance().update(&fcache_stats_increment);
+        return Status::OK();
+    }
+    g_peer_race_both_fail << 1;
+    return Status::InternalError<false>("peer race: both peer and s3 failed");
+}
+
+} // anonymous namespace
+
+Status CachedRemoteFileReader::_execute_s3_fallback(size_t empty_start, size_t 
span_size,
+                                                    std::unique_ptr<char[]>& 
buffer,
+                                                    PeerFetchResult* 
peer_result,
+                                                    ReadStatistics& stats,
+                                                    const IOContext* io_ctx) {
+    if (peer_result != nullptr) {
+        peer_result->clear();
+    }
+    buffer.reset(new char[span_size]);
+    size_t read_size = span_size;
+    return execute_s3_read(empty_start, read_size, buffer, stats, io_ctx, 
_remote_file_reader);
+}
+
+Status CachedRemoteFileReader::_execute_sequential_peer_read(
+        const std::vector<FileBlockSPtr>& empty_blocks, size_t empty_start, 
size_t span_size,
+        std::unique_ptr<char[]>& buffer, PeerFetchResult* peer_result, 
ReadStatistics& stats,
+        const IOContext* io_ctx, const std::vector<doris::PeerCandidate>& 
candidates,
+        int64_t tablet_id) {
+    // candidates[0] already reflects last_successful_compute_group_id 
affinity:
+    // get_peer_candidates() applies stable_partition before returning.
+    if (candidates.empty()) {
+        return _execute_s3_fallback(empty_start, span_size, buffer, 
peer_result, stats, io_ctx);
+    }
+
+    auto& manager = get_warm_up_manager();
+    PeerFetchResult serial_res;
+    auto st = execute_peer_read(empty_blocks, &serial_res, path().native(), 
this->size(),
+                                _is_doris_table, stats, io_ctx, 
candidates[0].host,
+                                candidates[0].brpc_port);
+    if (st.ok()) {
+        manager.update_peer_candidate_on_success(tablet_id, 
candidates[0].compute_group_id);
+        if (peer_result != nullptr) {
+            *peer_result = std::move(serial_res);
         }
-        if (io_ctx->file_cache_stats) {
-            // update stats in io_ctx, for query profile
-            _update_stats(stats, io_ctx->file_cache_stats, 
io_ctx->is_inverted_index);
+        return st;
+    }
+    // Track failure so affinity / eviction logic stays consistent with the 
race path.
+    if (st.is<ErrorCode::TOO_MANY_TASKS>()) {
+        // Server healthy but overloaded — don't penalize candidate.
+    } else if (st.is<ErrorCode::NOT_FOUND>()) {
+        manager.rotate_peer_candidate_on_cache_miss(tablet_id, 
candidates[0].host,
+                                                    candidates[0].brpc_port);
+    } else {
+        manager.update_peer_candidate_on_rpc_failure(tablet_id, 
candidates[0].host,
+                                                     candidates[0].brpc_port);
+    }
+    return _execute_s3_fallback(empty_start, span_size, buffer, peer_result, 
stats, io_ctx);
+}
+
+Status CachedRemoteFileReader::_execute_remote_read(const 
std::vector<FileBlockSPtr>& empty_blocks,
+                                                    size_t empty_start, size_t 
span_size,
+                                                    std::unique_ptr<char[]>& 
buffer,
+                                                    PeerFetchResult* 
peer_result,
+                                                    ReadStatistics& stats,
+                                                    const IOContext* io_ctx) {
+    // --- Non-peer path: direct S3 ---
+    if (!_should_read_from_peer(io_ctx)) {
+        return _execute_s3_fallback(empty_start, span_size, buffer, 
peer_result, stats, io_ctx);
+    }
+
+    // --- UT debug point: injected peer address ---
+    DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
+        std::string dp_host = dp->param<std::string>("host", "127.0.0.1");
+        int32_t dp_port = dp->param("port", 9060);
+        buffer.reset();
+        DCHECK(peer_result != nullptr);
+        peer_result->clear();
+        auto st = execute_peer_read(empty_blocks, peer_result, 
path().native(), this->size(),
+                                    _is_doris_table, stats, io_ctx, dp_host, 
dp_port);
+        if (st.ok()) return st;
+        return _execute_s3_fallback(empty_start, span_size, buffer, 
peer_result, stats, io_ctx);
+    });
+
+    // --- Resolve tablet and obtain peer candidates ---
+    int64_t tablet_id = _tablet_id;
+    auto& manager = get_warm_up_manager();
+    auto candidates = manager.get_peer_candidates(tablet_id);
+    if (candidates.empty()) {
+        if (!manager.is_peer_cooldown(tablet_id)) {
+            // Cold miss: trigger background FE fetch and fall back to S3.
+            g_peer_lazy_fetch_triggered << 1;
+            start_bthread([tablet_id]() {
+                auto& mgr = get_warm_up_manager();
+                mgr.fetch_candidates_from_fe(tablet_id);
+            });
         }
-    };
-    std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01, 
std::move(defer_func));
-    if (_is_doris_table && config::enable_read_cache_file_directly) {
-        // read directly
-        SCOPED_RAW_TIMER(&stats.read_cache_file_directly_timer);
-        size_t need_read_size = bytes_req;
-        std::shared_lock lock(_mtx);
-        if (!_cache_file_readers.empty()) {
-            // find the last offset > offset.
-            auto iter = _cache_file_readers.upper_bound(offset);
-            if (iter != _cache_file_readers.begin()) {
-                iter--;
-            }
-            size_t cur_offset = offset;
-            while (need_read_size != 0 && iter != _cache_file_readers.end()) {
-                if (iter->second->offset() > cur_offset ||
-                    iter->second->range().right < cur_offset) {
-                    break;
-                }
-                size_t file_offset = cur_offset - iter->second->offset();
-                size_t reserve_bytes =
-                        std::min(need_read_size, iter->second->range().size() 
- file_offset);
-                if (is_dryrun) [[unlikely]] {
-                    g_skip_local_cache_io_sum_bytes << reserve_bytes;
-                } else {
-                    SCOPED_RAW_TIMER(&stats.local_read_timer);
-                    if (!iter->second
-                                 ->read(Slice(result.data + (cur_offset - 
offset), reserve_bytes),
-                                        file_offset)
-                                 .ok()) { //TODO: maybe read failed because 
block evict, should handle error
-                        break;
-                    }
-                    stats.bytes_read_from_local += reserve_bytes;
-                }
-                _cache->add_need_update_lru_block(iter->second);
-                need_read_size -= reserve_bytes;
-                cur_offset += reserve_bytes;
-                already_read += reserve_bytes;
-                iter++;
-            }
-            if (need_read_size == 0) {
-                *bytes_read = bytes_req;
-                stats.hit_cache = true;
-                g_read_cache_direct_whole_num << 1;
-                g_read_cache_direct_whole_bytes << bytes_req;
-                read_success = true;
-                return Status::OK();
-            } else {
-                g_read_cache_direct_partial_num << 1;
-                g_read_cache_direct_partial_bytes << already_read;
+        return _execute_s3_fallback(empty_start, span_size, buffer, 
peer_result, stats, io_ctx);
+    }
+
+    // --- Dispatch: concurrent race or sequential fallback ---
+    // Candidates are already sorted by last_successful_compute_group_id 
affinity
+    // (stable_partition in get_peer_candidates), so the winner race peer 
bthread
+    // naturally tries the most promising candidate first — whether same-CG or 
cross-CG.
+    if (config::enable_peer_s3_race) {
+        return _execute_winner_race(empty_blocks, empty_start, span_size, 
buffer, peer_result,
+                                    stats, io_ctx, candidates, tablet_id);
+    }
+    return _execute_sequential_peer_read(empty_blocks, empty_start, span_size, 
buffer, peer_result,
+                                         stats, io_ctx, candidates, tablet_id);
+}
+
+Status CachedRemoteFileReader::_execute_winner_race(
+        const std::vector<FileBlockSPtr>& empty_blocks, size_t empty_start, 
size_t span_size,
+        std::unique_ptr<char[]>& buffer, PeerFetchResult* peer_result, 
ReadStatistics& stats,
+        const IOContext* io_ctx, const std::vector<doris::PeerCandidate>& 
candidates,
+        int64_t tablet_id) {
+    // Reserve a race slot; degrade to sequential if at limit.
+    if (g_active_peer_races.fetch_add(1, std::memory_order_relaxed) >=
+        config::max_concurrent_peer_races) {
+        g_active_peer_races.fetch_sub(1, std::memory_order_relaxed);
+        return _execute_sequential_peer_read(empty_blocks, empty_start, 
span_size, buffer,
+                                             peer_result, stats, io_ctx, 
candidates, tablet_id);
+    }
+
+    auto race = std::make_shared<RaceState>();
+
+    // Capture context for child threads.
+    const std::string file_path = path().native();
+    const size_t file_sz = this->size();
+    const bool is_doris = _is_doris_table;
+    auto remote_reader = _remote_file_reader;
+    std::shared_ptr<ResourceContext> parent_resource_ctx;
+    auto* parent_thread_context = thread_context();
+    if (parent_thread_context != nullptr && 
parent_thread_context->is_attach_task()) {
+        parent_resource_ctx = parent_thread_context->resource_ctx();

Review Comment:
   Please handle a failed `start_bthread()` here. The race slot has already 
been reserved; if `bthread_start_background()` fails, no peer worker will ever 
set `peer_done`. `launch_s3_race()` can then set `winner = 1`, and 
`collect_race_result()` waits forever in `while (race->winner == 1 && 
!race->peer_done)`. That also leaks the active race count. On failure, 
decrement the active count and fall back to sequential/S3, or mark the peer 
side done with an error before collecting.



##########
be/src/io/cache/cached_remote_file_reader.cpp:
##########
@@ -158,247 +193,530 @@ std::pair<size_t, size_t> 
CachedRemoteFileReader::s_align_size(size_t offset, si
 }
 
 namespace {
-// Execute S3 read
-Status execute_s3_read(size_t empty_start, size_t& size, 
std::unique_ptr<char[]>& buffer,
-                       ReadStatistics& stats, const IOContext* io_ctx,
-                       FileReaderSPtr remote_file_reader) {
-    s3_read_counter << 1;
-    SCOPED_RAW_TIMER(&stats.remote_read_timer);
-    stats.from_peer_cache = false;
-    return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), 
&size, io_ctx);
+struct PeerFetchLayout {
+    std::vector<size_t> block_offsets;
+    std::vector<size_t> block_sizes;
+    size_t total_size = 0;
+};
+
+bool contains_file_block(const PeerFetchedBlockSet& fetched_blocks, const 
FileBlockSPtr& block) {
+    return fetched_blocks.contains(block.get());
 }
 
-// Get peer connection info from tablet_id
-std::pair<std::string, int> get_peer_connection_info(int64_t tablet_id,
-                                                     const std::string& 
file_path) {
-    std::string host = "";
-    int port = 0;
+size_t clip_peer_block_size(const FileBlock::Range& range, size_t file_size) {
+    if (range.left >= file_size) {
+        return 0;
+    }
+    return std::min(file_size - range.left, range.size());
+}
 
-    DCHECK(tablet_id > 0);
-    auto& manager = 
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
-    if (auto tablet_info = manager.get_balanced_tablet_info(tablet_id)) {
-        host = tablet_info->first;
-        port = tablet_info->second;
-    } else {
-        LOG_EVERY_N(WARNING, 100) << "get peer connection info not found"
-                                  << ", tablet_id=" << tablet_id << ", 
file_path=" << file_path;
+PeerFetchLayout build_peer_fetch_layout(const std::vector<FileBlockSPtr>& 
blocks,
+                                        size_t file_size) {
+    PeerFetchLayout layout;
+    layout.block_offsets.reserve(blocks.size());
+    layout.block_sizes.reserve(blocks.size());
+    for (const auto& block : blocks) {
+        const size_t block_size = clip_peer_block_size(block->range(), 
file_size);
+        layout.block_offsets.push_back(layout.total_size);
+        layout.block_sizes.push_back(block_size);
+        layout.total_size += block_size;
     }
+    return layout;
+}
 
-    DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
-        host = dp->param<std::string>("host", "127.0.0.1");
-        port = dp->param("port", 9060);
-        LOG_WARNING("debug point 
PeerFileCacheReader::_fetch_from_peer_cache_blocks")
-                .tag("host", host)
-                .tag("port", port);
-    });
+Status write_peer_payloads_into_block(const FileBlockSPtr& block,
+                                      std::vector<const PeerFetchChunk*>& 
chunks,
+                                      size_t* block_size) {
+    if (block_size == nullptr) {
+        return Status::InvalidArgument("peer block write requires non-null 
block_size");
+    }
+    *block_size = 0;
+    if (chunks.empty()) {
+        return Status::OK();
+    }
+    std::sort(chunks.begin(), chunks.end(),
+              [](const PeerFetchChunk* lhs, const PeerFetchChunk* rhs) {
+                  return lhs->block_offset < rhs->block_offset;
+              });
+    butil::IOBuf payload;
+    for (const auto* chunk : chunks) {
+        *block_size += chunk->payload.length();
+        payload.append(chunk->payload);
+    }
+    DCHECK(*block_size != 0);
+    return block->append_iobuf(payload);
+}
 
-    return {host, port};
+void copy_peer_chunk_to_result(const PeerFetchChunk& chunk, size_t offset, 
size_t right_offset,
+                               size_t already_read, Slice result, size_t& 
indirect_read_bytes,
+                               SourceReadBreakdown& source_read_breakdown) {
+    const size_t payload_size = chunk.payload.length();
+    if (payload_size == 0) {
+        return;
+    }
+    const size_t chunk_left = chunk.block_offset;
+    const size_t chunk_right = chunk_left + payload_size - 1;
+    const size_t copy_left_offset = std::max(offset + already_read, 
chunk_left);
+    const size_t copy_right_offset = std::min(right_offset, chunk_right);
+    if (copy_left_offset > copy_right_offset) {
+        return;
+    }
+    const size_t copy_offset = copy_left_offset - chunk_left;
+    const size_t copy_size = copy_right_offset - copy_left_offset + 1;
+    char* dst = result.data + (copy_left_offset - offset);
+    chunk.payload.copy_to(dst, copy_size, copy_offset);
+    indirect_read_bytes += copy_size;
+    source_read_breakdown.peer_bytes += copy_size;
 }
 
-// Execute peer read with fallback to S3
-// file_size is the size of the file
-// used to calculate the rightmost boundary value of the block, due to 
inaccurate current block meta information.
-Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, 
size_t empty_start,
-                         size_t& size, std::unique_ptr<char[]>& buffer,
-                         const std::string& file_path, size_t file_size, bool 
is_doris_table,
-                         int64_t tablet_id, ReadStatistics& stats, const 
IOContext* io_ctx) {
-    auto [host, port] = get_peer_connection_info(tablet_id, file_path);
+// Execute peer read targeting a specific host:port.
+Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks,
+                         PeerFetchResult* peer_result, const std::string& 
file_path,
+                         size_t file_size, bool is_doris_table, 
ReadStatistics& stats,
+                         const IOContext* io_ctx, const std::string& host, 
int32_t port) {
     VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", 
port=" << port
                << ", file_path=" << file_path;
 
     if (host.empty() || port == 0) {
         g_failed_get_peer_addr_counter << 1;
-        VLOG_DEBUG << "PeerFileCacheReader host or port is empty"
-                   << ", host=" << host << ", port=" << port << ", file_path=" 
<< file_path;
+        LOG_EVERY_N(WARNING, 100) << "PeerFileCacheReader host or port is 
empty"
+                                  << ", host=" << host << ", port=" << port
+                                  << ", file_path=" << file_path;
         return Status::InternalError<false>("host or port is empty");
     }
     SCOPED_RAW_TIMER(&stats.peer_read_timer);
     peer_read_counter << 1;
     PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port);
-    auto st = peer_reader.fetch_blocks(empty_blocks, empty_start, 
Slice(buffer.get(), size), &size,
-                                       file_size, io_ctx);
+    // Serial peer read: source BE has the data from rebalance warm-up; no 
fill needed.
+    auto st = peer_reader.fetch_blocks(empty_blocks, peer_result, file_size, 
io_ctx,
+                                       /*request_fill=*/false);
     if (!st.ok()) {
-        VLOG_DEBUG << "PeerFileCacheReader read from peer failed"
-                   << ", host=" << host << ", port=" << port << ", error=" << 
st.msg();
+        LOG_WARNING("PeerFileCacheReader read from peer failed")
+                .tag("host", host)
+                .tag("port", port)
+                .tag("error", st.msg());
     }
-    stats.from_peer_cache = true;
+    stats.from_peer_cache = st.ok();
     return st;
 }
 
-} // anonymous namespace
+// Execute S3 read
+Status execute_s3_read(size_t empty_start, size_t& size, 
std::unique_ptr<char[]>& buffer,
+                       ReadStatistics& stats, const IOContext* io_ctx,
+                       FileReaderSPtr remote_file_reader) {
+    s3_read_counter << 1;
+    SCOPED_RAW_TIMER(&stats.remote_read_timer);
+    stats.from_peer_cache = false;
+    return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), 
&size, io_ctx);
+}
 
-Status CachedRemoteFileReader::_execute_remote_read(const 
std::vector<FileBlockSPtr>& empty_blocks,
-                                                    size_t empty_start, 
size_t& size,
-                                                    std::unique_ptr<char[]>& 
buffer,
-                                                    ReadStatistics& stats,
-                                                    const IOContext* io_ctx) {
-    DBUG_EXECUTE_IF("CachedRemoteFileReader.read_at_impl.change_type", {
-        // Determine read type from debug point or default to S3
-        std::string read_type = "s3";
-        read_type = dp->param<std::string>("type", "s3");
-        LOG_WARNING("CachedRemoteFileReader.read_at_impl.change_type")
-                .tag("path", path().native())
-                .tag("off", empty_start)
-                .tag("size", size)
-                .tag("type", read_type);
-        // Execute appropriate read strategy
-        if (read_type == "s3") {
-            return execute_s3_read(empty_start, size, buffer, stats, io_ctx, 
_remote_file_reader);
-        } else {
-            return execute_peer_read(empty_blocks, empty_start, size, buffer, 
path().native(),
-                                     this->size(), _is_doris_table, 
_tablet_id, stats, io_ctx);
+CloudWarmUpManager& get_warm_up_manager() {
+    return 
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+}
+
+// Shared state for peer-vs-S3 winner race.
+// Uses bthread primitives — never std::mutex/condition_variable in bthread 
context.
+struct RaceState {
+    bthread::Mutex mtx;
+    bthread::ConditionVariable cv;
+    int winner = -1; // 0=peer won, 1=s3 won, -1=undecided, -2=both failed
+    bool peer_done = false;
+    bool s3_done = false;
+    Status peer_status;
+    Status s3_status;
+    std::unique_ptr<char[]> s3_buf;
+    PeerFetchResult peer_res;
+    std::string peer_winner_cg_id; // compute_group_id of the winning peer 
candidate
+};
+
+// Peer race logic: try candidates sequentially until one succeeds or all fail.
+void run_peer_race(std::shared_ptr<RaceState> race, std::vector<FileBlockSPtr> 
empty_blocks,
+                   const std::string& file_path, size_t file_sz, bool is_doris,
+                   const IOContext* io_ctx, std::vector<doris::PeerCandidate> 
candidates,
+                   int64_t tablet_id, std::shared_ptr<ResourceContext> 
parent_resource_ctx) {
+    std::unique_ptr<AttachTask> attach_task;
+    if (parent_resource_ctx != nullptr) {
+        attach_task = std::make_unique<AttachTask>(parent_resource_ctx);
+    }
+
+    auto& manager = get_warm_up_manager();
+    bool all_tried = true;
+
+    for (size_t i = 0; i < candidates.size(); ++i) {
+        // Before issuing the next RPC, check if S3 already won.
+        if (i > 0) {
+            TEST_SYNC_POINT("run_peer_race::between_candidates");
+            std::unique_lock<bthread::Mutex> lk(race->mtx);
+            if (race->winner > 0) {
+                // S3 already won — stop, but not all candidates were tried.
+                all_tried = false;
+                break;
+            }
         }
-    });
 
-    if (!doris::config::is_cloud_mode() || !_is_doris_table || 
io_ctx->is_warmup ||
-        !doris::config::enable_cache_read_from_peer) {
-        return execute_s3_read(empty_start, size, buffer, stats, io_ctx, 
_remote_file_reader);
-    } else {
-        // first try peer read, if peer failed, fallback to S3
-        // peer timeout is 5 seconds
-        // TODO(dx): here peer and s3 reader need to get data in parallel, and 
take the one that is correct and returns first
-        // ATTN: Save original size before peer read, as it may be modified by 
fetch_blocks, read peer ref size
-        size_t original_size = size;
-        auto st = execute_peer_read(empty_blocks, empty_start, size, buffer, 
path().native(),
-                                    this->size(), _is_doris_table, _tablet_id, 
stats, io_ctx);
-        if (!st.ok()) {
-            // Restore original size for S3 fallback, as peer read may have 
modified it
-            size = original_size;
-            // Fallback to S3
-            return execute_s3_read(empty_start, size, buffer, stats, io_ctx, 
_remote_file_reader);
+        const auto& cand = candidates[i];
+        peer_read_counter << 1;
+        PeerFileCacheReader peer_reader(file_path, is_doris, cand.host, 
cand.brpc_port);
+        PeerFetchResult local_peer_res;
+        const bool request_fill = 
!config::peer_cache_fill_compute_group_id.empty() &&
+                                  cand.compute_group_id == 
config::peer_cache_fill_compute_group_id;
+        auto st = peer_reader.fetch_blocks(empty_blocks, &local_peer_res, 
file_sz, io_ctx,
+                                           request_fill, tablet_id);
+        if (st.ok()) {
+            manager.update_peer_candidate_on_success(tablet_id, 
cand.compute_group_id);
+            std::unique_lock<bthread::Mutex> lk(race->mtx);
+            if (race->winner < 0) {
+                race->winner = 0;
+                race->peer_res = std::move(local_peer_res);
+                race->peer_winner_cg_id = cand.compute_group_id;
+            }
+            race->peer_done = true;
+            race->peer_status = Status::OK();
+            race->cv.notify_all();
+            return;
+        }
+
+        // Handle per-candidate failure.
+        if (st.template is<ErrorCode::TOO_MANY_TASKS>()) {
+            all_tried = false;
+            break;
+        }
+        if (st.template is<ErrorCode::NOT_FOUND>()) {
+            manager.rotate_peer_candidate_on_cache_miss(tablet_id, cand.host, 
cand.brpc_port);
+        } else {
+            manager.update_peer_candidate_on_rpc_failure(tablet_id, cand.host, 
cand.brpc_port);
         }
-        return st;
     }
+
+    if (all_tried) {
+        manager.record_peer_all_miss(tablet_id);
+    }
+    std::unique_lock<bthread::Mutex> lk(race->mtx);
+    race->peer_done = true;
+    race->peer_status = Status::InternalError<false>("peer: all candidates 
failed");
+    if (race->winner < 0 && race->s3_done) {
+        race->winner = race->s3_status.ok() ? 1 : -2;
+    }
+    race->cv.notify_all();
 }
 
-Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, 
size_t* bytes_read,
-                                            const IOContext* io_ctx) {
-    size_t already_read = 0;
-    
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at);
+// Apply hedge delay, then submit S3 read to the thread pool (or run inline).
+void launch_s3_race(std::shared_ptr<RaceState> race, size_t empty_start, 
size_t span_size,
+                    const IOContext* io_ctx, FileReaderSPtr remote_reader,
+                    std::shared_ptr<ResourceContext> parent_resource_ctx) {
+    // Raw S3 read body.
+    auto do_s3_read = [race, empty_start, span_size, io_ctx, remote_reader]() {
+        auto s3_buf = std::make_unique<char[]>(span_size);

Review Comment:
   This S3 task can outlive `read_at_impl()` when the peer side wins. 
`collect_race_result()` returns immediately for `winner == 0`, but this lambda 
still captures the raw `IOContext*` and passes it into 
`remote_reader->read_at()`. When `read_at_impl()` used its local 
`default_io_ctx`, or when the caller owns a request-scoped context, the 
peer-win path can return and leave the background S3 read using a dangling 
pointer. Please either keep the S3 branch joined/cancelled before returning, or 
give the S3 task an owned context whose referenced fields remain valid until 
the task finishes.



##########
be/src/cloud/cloud_internal_service.cpp:
##########
@@ -237,38 +316,224 @@ Status read_file_block(const 
std::shared_ptr<io::FileBlock>& file_block, size_t
     }
     size_t read_size = std::min(static_cast<size_t>(file_size - 
file_block->offset()),
                                 file_block->range().size());
-    data.resize(read_size);
+    output->set_block_offset(static_cast<int64_t>(file_block->offset()));
+    output->set_block_size(static_cast<int64_t>(read_size));
+    if (read_size == 0) {
+        return Status::OK();
+    }
+
+    Status read_st = Status::OK();
+    // Attachment payload mode: protobuf carries metadata only, payload goes 
to attachment.
+    // This allows FS cache to use a file-descriptor->IOBuf path directly.
+    if (response_attachment != nullptr) {
+        size_t bytes_read = 0;
+        auto begin_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
+                                          
std::chrono::steady_clock::now().time_since_epoch())
+                                          .count();
+        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+        read_st = file_block->read_to_iobuf(response_attachment, 
/*read_offset=*/0, read_size,
+                                            &bytes_read);
+        auto end_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
+                                        
std::chrono::steady_clock::now().time_since_epoch())
+                                        .count();
+        g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts 
- begin_read_file_ts);
+
+        if (read_st.ok()) {
+            if (bytes_read != read_size) {
+                return Status::InternalError<false>(
+                        "peer cache read short data, expected={}, actual={}", 
read_size,
+                        bytes_read);
+            }
+            g_file_cache_get_by_peer_response_bytes_total << bytes_read;
+            return Status::OK();
+        }
+    } else {
+        std::string data;
+        data.resize(read_size);
+        auto begin_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
+                                          
std::chrono::steady_clock::now().time_since_epoch())
+                                          .count();
+        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+        Slice slice(data.data(), data.size());
+        read_st = file_block->read(slice, /*read_offset=*/0);
+        auto end_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
+                                        
std::chrono::steady_clock::now().time_since_epoch())
+                                        .count();
+        g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts 
- begin_read_file_ts);
+
+        if (read_st.ok()) {
+            auto set_data_start = std::chrono::steady_clock::now();
+            output->set_data(std::move(data));
+            set_data_us = elapsed_us(set_data_start);
+            g_file_cache_get_by_peer_response_bytes_total << read_size;
+            return Status::OK();
+        }
+    }
+
+    g_file_cache_get_by_peer_failed_num << 1;
+    LOG(WARNING) << "read cache block failed: " << read_st;
+    return read_st;
+}
 
-    auto begin_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
-                                      
std::chrono::steady_clock::now().time_since_epoch())
-                                      .count();
+// Trigger S3 -> local cache fill for the given file block.
+// Returns OK when the block is DOWNLOADED after the fill.
+// Returns TOO_MANY_TASKS when the fill slot is exhausted (server healthy but 
overloaded):
+//   client should not rotate or evict, just fall back to S3 and retry same 
candidate later.
+// Returns NOT_FOUND for soft misses (tablet not found, fill incomplete, 
timeout):
+//   client should rotate the candidate to try a different CG next time.
+// fill_tablet_id: tablet id from PFetchPeerDataRequest.fill_tablet_id; used 
to look up the
+//   tablet and reconstruct the remote path server-side (avoids trusting 
client-supplied paths).
+// filename: PFetchPeerDataRequest.path(), the cache-key filename used to 
match the segment.
+Status trigger_peer_server_fill(io::FileBlockSPtr& fb, int64_t fill_tablet_id,
+                                const std::string& filename, int64_t 
file_size, int64_t offset,
+                                int64_t size, int32_t timeout_ms) {
+    g_peer_server_fill_requested << 1;
 
-    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
-    Slice slice(data.data(), data.size());
-    Status read_st = file_block->read(slice, /*read_offset=*/0);
+    // Concurrency guard: atomically reserve a fill slot.
+    // Excess requests are rejected so the client falls back to its own S3 
read.
+    // Return NOT_FOUND so the client rotates the candidate instead of 
evicting it.
+    if (g_active_server_fills.fetch_add(1, std::memory_order_relaxed) >=
+        config::max_concurrent_peer_server_fills) {
+        g_active_server_fills.fetch_sub(1, std::memory_order_relaxed);
+        g_peer_server_fill_rejected << 1;
+        VLOG_DEBUG << "trigger_peer_server_fill: rejected (concurrency limit "
+                   << config::max_concurrent_peer_server_fills << "), 
tablet_id=" << fill_tablet_id;
+        // TOO_MANY_TASKS: server is healthy but overloaded. Client must not 
rotate or evict;
+        // just fall back to S3 for this request and retry the same candidate 
next time.
+        return Status::Error<ErrorCode::TOO_MANY_TASKS, false>("fill slot 
exhausted");
+    }
+    // RAII decrement: runs on every return path below.
+    Defer fill_guard {[]() { g_active_server_fills.fetch_sub(1, 
std::memory_order_relaxed); }};
 
-    auto end_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
-                                    
std::chrono::steady_clock::now().time_since_epoch())
-                                    .count();
-    g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts - 
begin_read_file_ts);
+    auto& cloud_engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
+    auto tablet_res = cloud_engine.tablet_mgr().get_tablet(fill_tablet_id);
+    if (!tablet_res.has_value()) {
+        LOG(WARNING) << "trigger_peer_server_fill: tablet not found, 
tablet_id=" << fill_tablet_id;
+        g_peer_server_fill_rejected << 1;
+        return Status::NotFound<false>("fill: tablet not found");
+    }
+    auto tablet = tablet_res.value();
+
+    // Iterate rowsets and segments to find the one whose remote path filename 
matches the
+    // cache-key filename sent by the client. The server reconstructs the full 
remote path
+    // from its own tablet metadata, keeping path layout authoritative on the 
server side.
+    io::FileSystemSPtr fs;
+    std::string remote_path;
+    auto rowsets = tablet->get_snapshot_rowset(false);
+    for (const auto& rs : rowsets) {
+        if (!remote_path.empty()) break;

Review Comment:
   Server-side fill currently only searches `.dat` segment paths. The peer 
cache reader can request inverted-index cache blocks too, and those filenames 
are produced by `remote_idx_*_path(...)`, so an `.idx` request with 
`request_cache_fill=true` will never find `remote_path` and will always return 
NOT_FOUND/fall back to the requester's S3 read. Please carry or derive the file 
type for the fill request and resolve index paths as well, or explicitly skip 
fill before sending `request_cache_fill` for index files.



##########
be/src/util/bthread_utils.h:
##########
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bthread/bthread.h>
+
+#include <functional>
+#include <memory>
+
+#include "runtime/thread_context.h"
+
+namespace doris {
+
+// Launch a callable in a background bthread (fire-and-forget).
+// The callable is heap-allocated and deleted after execution.
+// Returns true on success, false if bthread creation failed (callable is 
still deleted).
+//
+// init_thread_ctx: when true, initialises the bthread's thread-local 
ThreadContext via
+// ScopedInitThreadContext before invoking fn.  Set this when the bthread 
needs memory-tracker
+// or workload-group context (e.g. attaches an AttachTask inside fn).  
Defaults to false so
+// existing call-sites that do not need thread-context initialisation are 
unaffected.
+template <typename Fn>
+bool start_bthread(Fn&& fn, bool init_thread_ctx = false, const 
bthread_attr_t* attr = nullptr) {
+    struct Args {
+        std::function<void()> fn;
+        bool init_thread_ctx;
+    };
+    auto args = std::make_unique<Args>(Args {std::forward<Fn>(fn), 
init_thread_ctx});
+    auto entry = [](void* arg) -> void* {
+        // Reclaim ownership so Args is deleted when this scope exits.
+        auto a = std::unique_ptr<Args>(reinterpret_cast<Args*>(arg));
+        std::optional<ScopedInitThreadContext> scoped;

Review Comment:
   `std::optional` is used in this public header but `<optional>` is not 
included. The current include chain happens to pull it in before 
`bthread_utils.h`, but the header is not self-contained and will fail to 
compile when included from a translation unit that does not already include 
`<optional>`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to