This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 799057112d5 [fix](cloud) Fix warmup inflight count not decremented on
early return (#60480)
799057112d5 is described below
commit 799057112d512de84c38ab2c80f1e1b2999676c0
Author: deardeng <[email protected]>
AuthorDate: Wed Feb 11 12:58:37 2026 +0800
[fix](cloud) Fix warmup inflight count not decremented on early return
(#60480)
## Proposed changes
### Problem
During cloud tablet decommission, some tablets take unexpectedly long
time (5+ minutes) to migrate because FE keeps waiting for warmup tasks
to complete, even though the tasks have already failed on BE side.
**Root cause**: In
`FileCacheBlockDownloader::download_file_cache_block()`, when early
return occurs (e.g., tablet not found, rowset not found, storage
resource error), the `_inflight_tablets` count is not decremented. This
causes:
1. `check_download_task()` always returns `done=false` for these tablets
2. FE's `checkInflightWarmUpCacheAsync()` waits until timeout (default
300 seconds)
3. Tablet migration is blocked unnecessarily
**Example log showing the issue**:
```
W download_file_cache_block: tablet_id=1769675033824 rowset_id not found,
rowset_id=020000000010fa85...
```
After this warning, the tablet's inflight count remains in
`_inflight_tablets` map, causing the 5-minute wait before FE times out
and proceeds.
### Solution
1. Extract the inflight count decrement logic into a reusable lambda
`decrease_inflight_count`
2. Call `decrease_inflight_count()` in all early return paths:
- When `get_tablet()` fails
- When `rowset_id` is not found
- When `remote_storage_resource()` fails
3. Refactor `download_done` callback to reuse `decrease_inflight_count`,
eliminating code duplication
4. Use value capture for `decrease_inflight_count` in `download_done`
lambda to ensure lifetime safety if the callback is ever called
asynchronously in the future
5. Add unit tests to verify inflight count is correctly decremented on
failures
## Further comments
This bug also causes a minor memory leak: entries in `_inflight_tablets`
map are never cleaned up when warmup fails, slowly accumulating over
time (cleared on BE restart).
---
be/src/io/cache/block_file_cache_downloader.cpp | 48 +++--
.../io/cache/block_file_cache_downloader_test.cpp | 240 +++++++++++++++++++++
2 files changed, 272 insertions(+), 16 deletions(-)
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index bcd21afe725..9b318779002 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -181,9 +181,34 @@ void FileCacheBlockDownloader::download_file_cache_block(
<< ", rowset_id=" << meta.rowset_id() << ", segment_id=" <<
meta.segment_id()
<< ", offset=" << meta.offset() << ", size=" << meta.size()
<< ", type=" << meta.cache_type();
+
+ // Helper to decrease inflight count on early return.
+ // NOTE: This lambda captures 'this' pointer. It's safe because:
+ // 1. download_segment_file() calls download_done synchronously
+ // 2. ~FileCacheBlockDownloader() waits for all workers to finish via
_workers->shutdown()
+ // If this assumption changes (e.g., async callback), consider using
shared_from_this pattern.
+ auto decrease_inflight_count = [this, tablet_id = meta.tablet_id()]() {
+ std::lock_guard lock(_inflight_mtx);
+ auto it = _inflight_tablets.find(tablet_id);
+ if (it == _inflight_tablets.end()) {
+ LOG(WARNING) << "inflight ref cnt not exist, tablet id " <<
tablet_id;
+ } else {
+ it->second--;
+ VLOG_DEBUG << "download_file_cache_block: inflight_tablets["
<< tablet_id
+ << "] = " << it->second;
+ if (it->second <= 0) {
+ DCHECK_EQ(it->second, 0) << it->first;
+ _inflight_tablets.erase(it);
+ VLOG_DEBUG << "download_file_cache_block: erase
inflight_tablets[" << tablet_id
+ << "]";
+ }
+ }
+ };
+
CloudTabletSPtr tablet;
if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(),
false); !res.has_value()) {
LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : "
<< res.error();
+ decrease_inflight_count();
return;
} else {
tablet = std::move(res).value();
@@ -202,12 +227,14 @@ void FileCacheBlockDownloader::download_file_cache_block(
if (find_it == id_to_rowset_meta_map.end()) {
LOG(WARNING) << "download_file_cache_block: tablet_id=" <<
meta.tablet_id()
<< " rowset_id not found, rowset_id=" <<
meta.rowset_id();
+ decrease_inflight_count();
return;
}
auto storage_resource = find_it->second->remote_storage_resource();
if (!storage_resource) {
LOG(WARNING) << storage_resource.error();
+ decrease_inflight_count();
return;
}
// Use RowsetMeta::fs() instead of storage_resource->fs to support
packed file.
@@ -218,26 +245,15 @@ void FileCacheBlockDownloader::download_file_cache_block(
if (!file_system) {
LOG(WARNING) << "download_file_cache_block: failed to get file
system for tablet_id="
<< meta.tablet_id() << ", rowset_id=" <<
meta.rowset_id();
+ decrease_inflight_count();
return;
}
- auto download_done = [&, tablet_id = meta.tablet_id()](Status st) {
- std::lock_guard lock(_inflight_mtx);
- auto it = _inflight_tablets.find(tablet_id);
+ // Capture decrease_inflight_count by value to ensure lifetime safety
+ // even if download_done is called asynchronously in the future
+ auto download_done = [decrease_inflight_count, tablet_id =
meta.tablet_id()](Status st) {
TEST_SYNC_POINT_CALLBACK("FileCacheBlockDownloader::download_file_cache_block");
- if (it == _inflight_tablets.end()) {
- LOG(WARNING) << "inflight ref cnt not exist, tablet id " <<
tablet_id;
- } else {
- it->second--;
- VLOG_DEBUG << "download_file_cache_block: inflight_tablets["
<< tablet_id
- << "] = " << it->second;
- if (it->second <= 0) {
- DCHECK_EQ(it->second, 0) << it->first;
- _inflight_tablets.erase(it);
- VLOG_DEBUG << "download_file_cache_block: erase
inflight_tablets[" << tablet_id
- << "]";
- }
- }
+ decrease_inflight_count();
LOG(INFO) << "download_file_cache_block: download_done,
tablet_Id=" << tablet_id
<< " status=" << st.to_string();
};
diff --git a/be/test/io/cache/block_file_cache_downloader_test.cpp
b/be/test/io/cache/block_file_cache_downloader_test.cpp
new file mode 100644
index 00000000000..79974ef4860
--- /dev/null
+++ b/be/test/io/cache/block_file_cache_downloader_test.cpp
@@ -0,0 +1,240 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/block_file_cache_downloader.h"
+
+#include <gen_cpp/internal_service.pb.h>
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <thread>
+
+#include "cloud/cloud_storage_engine.h"
+#include "common/config.h"
+
+namespace doris::io {
+
+class FileCacheBlockDownloaderTest : public testing::Test {
+public:
+ FileCacheBlockDownloaderTest() : _engine(CloudStorageEngine(EngineOptions
{})) {}
+
+ void SetUp() override {
+ // Enable file cache for testing
+ config::enable_file_cache = true;
+ // Set reasonable thread pool size for testing
+ config::file_cache_downloader_thread_num_min = 2;
+ config::file_cache_downloader_thread_num_max = 4;
+ }
+
+ void TearDown() override { config::enable_file_cache = false; }
+
+ // Helper to wait for inflight tasks to complete with timeout
+ bool wait_for_task_done(FileCacheBlockDownloader& downloader, int64_t
tablet_id,
+ int timeout_seconds = 10) {
+ auto start = std::chrono::steady_clock::now();
+ while (true) {
+ std::vector<int64_t> tablets = {tablet_id};
+ std::map<int64_t, bool> done;
+ downloader.check_download_task(tablets, &done);
+
+ if (done.contains(tablet_id) && done[tablet_id]) {
+ return true;
+ }
+
+ auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
+ std::chrono::steady_clock::now() - start)
+ .count();
+ if (elapsed >= timeout_seconds) {
+ return false;
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ }
+
+ // Helper to wait for multiple tablets
+ bool wait_for_tasks_done(FileCacheBlockDownloader& downloader,
+ const std::vector<int64_t>& tablet_ids, int
timeout_seconds = 10) {
+ auto start = std::chrono::steady_clock::now();
+ while (true) {
+ std::map<int64_t, bool> done;
+ downloader.check_download_task(tablet_ids, &done);
+
+ bool all_done = true;
+ for (int64_t tablet_id : tablet_ids) {
+ if (!done.contains(tablet_id) || !done[tablet_id]) {
+ all_done = false;
+ break;
+ }
+ }
+
+ if (all_done) {
+ return true;
+ }
+
+ auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
+ std::chrono::steady_clock::now() - start)
+ .count();
+ if (elapsed >= timeout_seconds) {
+ return false;
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ }
+
+protected:
+ CloudStorageEngine _engine;
+};
+
+// Test that inflight count is correctly decremented when tablet is not found
+// This tests the bug fix where early return did not decrement inflight count
+TEST_F(FileCacheBlockDownloaderTest,
TestInflightCountDecrementOnTabletNotFound) {
+ FileCacheBlockDownloader downloader(_engine);
+
+ // Create a download task with a non-existent tablet
+ DownloadTask::FileCacheBlockMetaVec metas;
+ auto* meta = metas.Add();
+ meta->set_tablet_id(99999999); // Non-existent tablet ID
+ meta->set_rowset_id("non_existent_rowset");
+ meta->set_segment_id(0);
+ meta->set_offset(0);
+ meta->set_size(1024);
+
+ DownloadTask task(std::move(metas));
+
+ // Submit the task - this increments inflight count
+ downloader.submit_download_task(std::move(task));
+
+ // Wait for the task to be processed with timeout
+ bool done = wait_for_task_done(downloader, 99999999);
+
+ // The task should be marked as done because inflight count was decremented
+ EXPECT_TRUE(done) << "Inflight count should be decremented even when
tablet not found";
+}
+
+// Test that inflight count is correctly decremented for multiple metas
+// when some fail and some succeed (or all fail)
+TEST_F(FileCacheBlockDownloaderTest,
TestInflightCountDecrementOnMultipleMetasFail) {
+ FileCacheBlockDownloader downloader(_engine);
+
+ // Create a download task with multiple non-existent tablets
+ DownloadTask::FileCacheBlockMetaVec metas;
+
+ std::vector<int64_t> tablet_ids;
+ // Add multiple metas for different tablets
+ for (int i = 0; i < 5; i++) {
+ auto* meta = metas.Add();
+ int64_t tablet_id = 88888880 + i;
+ meta->set_tablet_id(tablet_id); // Non-existent tablet IDs
+ meta->set_rowset_id("non_existent_rowset_" + std::to_string(i));
+ meta->set_segment_id(0);
+ meta->set_offset(0);
+ meta->set_size(1024);
+ tablet_ids.push_back(tablet_id);
+ }
+
+ DownloadTask task(std::move(metas));
+
+ // Submit the task
+ downloader.submit_download_task(std::move(task));
+
+ // Wait for all tasks to be processed
+ bool all_done = wait_for_tasks_done(downloader, tablet_ids);
+
+ // All tasks should be marked as done
+ EXPECT_TRUE(all_done) << "All inflight counts should be decremented for
non-existent tablets";
+}
+
+// Test that inflight count is correctly decremented for same tablet with
multiple blocks
+TEST_F(FileCacheBlockDownloaderTest,
TestInflightCountDecrementSameTabletMultipleBlocks) {
+ FileCacheBlockDownloader downloader(_engine);
+
+ // Create a download task with multiple blocks for the same tablet
+ DownloadTask::FileCacheBlockMetaVec metas;
+
+ int64_t tablet_id = 77777777;
+ for (int i = 0; i < 3; i++) {
+ auto* meta = metas.Add();
+ meta->set_tablet_id(tablet_id); // Same tablet
+ meta->set_rowset_id("non_existent_rowset");
+ meta->set_segment_id(i);
+ meta->set_offset(i * 1024);
+ meta->set_size(1024);
+ }
+
+ DownloadTask task(std::move(metas));
+
+ // Submit the task - this increments inflight count by 3 for the same
tablet
+ downloader.submit_download_task(std::move(task));
+
+ // Wait for the task to be processed
+ bool done = wait_for_task_done(downloader, tablet_id);
+
+ // The task should be marked as done
+ // All 3 blocks should have decremented the count, resulting in removal
from map
+ EXPECT_TRUE(done) << "Inflight count should be zero after all blocks
processed for tablet "
+ << tablet_id;
+}
+
+// Test check_download_task returns true for tablets that were never submitted
+TEST_F(FileCacheBlockDownloaderTest, TestCheckDownloadTaskNonExistentTablet) {
+ FileCacheBlockDownloader downloader(_engine);
+
+ // Check a tablet that was never submitted
+ std::vector<int64_t> tablets = {11111111};
+ std::map<int64_t, bool> done;
+ downloader.check_download_task(tablets, &done);
+
+ // Should return true (done) because it's not in inflight map
+ ASSERT_TRUE(done.contains(11111111));
+ EXPECT_TRUE(done[11111111]) << "Non-existent tablet should be reported as
done";
+}
+
+// Test that check_download_task correctly reports in-flight status
+TEST_F(FileCacheBlockDownloaderTest, TestCheckDownloadTaskInflightStatus) {
+ FileCacheBlockDownloader downloader(_engine);
+
+ int64_t tablet_id = 66666666;
+
+ // Initially, should be done (not in inflight)
+ {
+ std::vector<int64_t> tablets = {tablet_id};
+ std::map<int64_t, bool> done;
+ downloader.check_download_task(tablets, &done);
+ EXPECT_TRUE(done[tablet_id]) << "Initially tablet should not be in
inflight";
+ }
+
+ // Submit a task
+ DownloadTask::FileCacheBlockMetaVec metas;
+ auto* meta = metas.Add();
+ meta->set_tablet_id(tablet_id);
+ meta->set_rowset_id("non_existent_rowset");
+ meta->set_segment_id(0);
+ meta->set_offset(0);
+ meta->set_size(1024);
+
+ DownloadTask task(std::move(metas));
+ downloader.submit_download_task(std::move(task));
+
+ // After task completes (should be quick since tablet doesn't exist)
+ bool done = wait_for_task_done(downloader, tablet_id);
+ EXPECT_TRUE(done) << "Task should complete with decremented inflight
count";
+}
+
+} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]