This is an automated email from the ASF dual-hosted git repository.
airborne pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 664f942576d [codex] fix ANN OpenMP build budget and add concurrency
test (#61313)
664f942576d is described below
commit 664f942576dc14ef3f93baa07a4e6980a641f03d
Author: zhiqiang <[email protected]>
AuthorDate: Tue Mar 24 13:41:48 2026 +0800
[codex] fix ANN OpenMP build budget and add concurrency test (#61313)
## Summary
This PR fixes ANN index build OpenMP thread budgeting and adds a BE unit
test for the concurrency cap.
## Problem
`ScopedOmpThreadBudget` computed available budget from
`config::omp_threads_limit` directly. With the default
`omp_threads_limit = -1`, each builder effectively reserved 1 thread and
did not follow the documented auto behavior (80% of CPU cores). In
addition, concurrent builders could overrun the intended global limit
because there was no wait/coordination when the budget was exhausted.
## Root Cause
In `faiss_ann_index.cpp`:
- The constructor used `config::omp_threads_limit` directly instead of
the auto-resolved limit from `get_omp_threads_limit()`.
- No blocking mechanism existed when all OpenMP budget was already in
use.
## Fix
- Use `get_omp_threads_limit()` as the actual global limit for
budgeting.
- Add a condition variable to block builders until at least one OpenMP
slot is available.
- Keep the existing policy of reserving up to half of remaining budget
(minimum 1).
- Add concise comments to explain wait/wakeup behavior.
## Test
- Added `VectorSearchTest.OmpThreadBudgetNeverExceedsLimit` in
`be/test/storage/index/ann/faiss_vector_index_test.cpp`.
- The test sets `config::omp_threads_limit = 1`, runs multiple
concurrent ANN `add()` builds, samples `ann_index_build_index_threads`,
and asserts peak usage never exceeds 1 and finally returns to 0.
## Validation
- Local compilation/test run was intentionally skipped per request.
---
be/src/storage/index/ann/faiss_ann_index.cpp | 9 ++-
.../storage/index/ann/faiss_vector_index_test.cpp | 65 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 1 deletion(-)
diff --git a/be/src/storage/index/ann/faiss_ann_index.cpp
b/be/src/storage/index/ann/faiss_ann_index.cpp
index 77b865d82bf..70a2ed01bc9 100644
--- a/be/src/storage/index/ann/faiss_ann_index.cpp
+++ b/be/src/storage/index/ann/faiss_ann_index.cpp
@@ -62,6 +62,7 @@ namespace doris::segment_v2 {
namespace {
std::mutex g_omp_thread_mutex;
+std::condition_variable g_omp_thread_cv;
int g_index_threads_in_use = 0;
// Guard that ensures the total OpenMP threads used by concurrent index builds
@@ -71,7 +72,11 @@ public:
// For each index build, reserve at most half of the remaining threads, at
least 1 thread.
ScopedOmpThreadBudget() {
std::unique_lock<std::mutex> lock(g_omp_thread_mutex);
- auto thread_cap = config::omp_threads_limit - g_index_threads_in_use;
+ auto omp_threads_limit = get_omp_threads_limit();
+ // Block until there is at least one OpenMP slot available under the
global cap.
+ g_omp_thread_cv.wait(lock, [&] { return g_index_threads_in_use <
omp_threads_limit; });
+ auto thread_cap = omp_threads_limit - g_index_threads_in_use;
+ // Keep headroom for other concurrent index builds: take up to half of
remaining budget.
_reserved_threads = std::max(1, thread_cap / 2);
g_index_threads_in_use += _reserved_threads;
DorisMetrics::instance()->ann_index_build_index_threads->increment(_reserved_threads);
@@ -88,6 +93,8 @@ public:
if (g_index_threads_in_use < 0) {
g_index_threads_in_use = 0;
}
+ // Wake waiting index builders so they can compete for the released
OpenMP budget.
+ g_omp_thread_cv.notify_all();
VLOG_DEBUG << fmt::format(
"ScopedOmpThreadBudget release threads reserved={},
remaining_in_use={}, limit={}",
_reserved_threads, g_index_threads_in_use,
get_omp_threads_limit());
diff --git a/be/test/storage/index/ann/faiss_vector_index_test.cpp
b/be/test/storage/index/ann/faiss_vector_index_test.cpp
index 9b4016bbcb8..d9a61adcaeb 100644
--- a/be/test/storage/index/ann/faiss_vector_index_test.cpp
+++ b/be/test/storage/index/ann/faiss_vector_index_test.cpp
@@ -21,18 +21,24 @@
#include <gtest/gtest.h>
#include <algorithm>
+#include <atomic>
+#include <chrono>
#include <cstddef>
#include <limits>
#include <memory>
#include <random>
#include <string>
+#include <thread>
#include <vector>
+#include "common/config.h"
+#include "common/metrics/doris_metrics.h"
#include "storage/index/ann/ann_index.h"
#include "storage/index/ann/ann_search_params.h"
#include "storage/index/ann/faiss_ann_index.h"
// metrics.h not used directly here
#include "storage/index/ann/vector_search_utils.h"
+#include "util/defer_op.h"
using namespace doris::segment_v2;
@@ -233,6 +239,65 @@ TEST_F(VectorSearchTest, UpdateRoaring) {
}
}
+TEST_F(VectorSearchTest, OmpThreadBudgetNeverExceedsLimit) {
+ constexpr int kWorkers = 2;
+ constexpr int kDim = 64;
+ // Keep this workload small to avoid long-running BE UT under ASAN.
+ constexpr int kNumVectors = 500;
+
+ const auto old_omp_threads_limit = config::omp_threads_limit;
+ config::omp_threads_limit = 1;
+ Defer reset_omp_threads_limit(
+ [&old_omp_threads_limit]() { config::omp_threads_limit =
old_omp_threads_limit; });
+
+ auto* budget_metric =
DorisMetrics::instance()->ann_index_build_index_threads;
+ std::atomic<bool> start {false};
+ std::atomic<int> finished {0};
+ std::vector<std::thread> workers;
+ workers.reserve(kWorkers);
+
+ for (int worker_id = 0; worker_id < kWorkers; ++worker_id) {
+ workers.emplace_back([&start, &finished, worker_id]() {
+ auto index = std::make_unique<FaissVectorIndex>();
+ FaissBuildParameter params;
+ params.dim = kDim;
+ params.max_degree = 8;
+ params.ef_construction = 20;
+ params.index_type = FaissBuildParameter::IndexType::HNSW;
+ index->build(params);
+
+ std::vector<float> vectors(static_cast<size_t>(kNumVectors) * kDim,
+ static_cast<float>(worker_id + 1));
+ while (!start.load(std::memory_order_acquire)) {
+ std::this_thread::yield();
+ }
+
+ auto st = index->add(kNumVectors, vectors.data());
+ EXPECT_TRUE(st.ok()) << st.to_string();
+ finished.fetch_add(1, std::memory_order_acq_rel);
+ });
+ }
+
+ start.store(true, std::memory_order_release);
+
+ int64_t observed_peak = 0;
+ auto deadline = std::chrono::steady_clock::now() +
std::chrono::seconds(20);
+ while (finished.load(std::memory_order_acquire) < kWorkers &&
+ std::chrono::steady_clock::now() < deadline) {
+ observed_peak = std::max<int64_t>(observed_peak,
budget_metric->value());
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+
+ for (auto& worker : workers) {
+ worker.join();
+ }
+
+ observed_peak = std::max<int64_t>(observed_peak, budget_metric->value());
+ EXPECT_EQ(finished.load(std::memory_order_acquire), kWorkers);
+ EXPECT_LE(observed_peak, 1);
+ EXPECT_EQ(budget_metric->value(), 0);
+}
+
TEST_F(VectorSearchTest, CompareResultWithNativeFaiss1) {
const size_t iterations = 3;
// Create random number generator
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]