This is an automated email from the ASF dual-hosted git repository.

zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ab1a4ddb592 [refine](exec) replace std::shared_mutex/std::shared_lock 
with annotated wrappers for thread safety analysis (#63109)
ab1a4ddb592 is described below

commit ab1a4ddb59263c9f48d60c35e016212858a9547a
Author: Mryange <[email protected]>
AuthorDate: Mon May 18 16:20:49 2026 +0800

    [refine](exec) replace std::shared_mutex/std::shared_lock with annotated 
wrappers for thread safety analysis (#63109)
    
    After #63070 replaced `std::mutex`/`std::lock_guard` with annotated
    wrappers (`AnnotatedMutex`/`LockGuard`), shared mutex usage still relied
    on raw `std::shared_mutex` and `std::shared_lock`/`std::unique_lock`,
    which are invisible to Clang's thread safety analysis. This leaves
    shared-lock sites unverified — `GUARDED_BY`, `REQUIRES_SHARED`, and
    other annotations cannot be enforced.
    
    - Add `AnnotatedSharedMutex` (wrapping `std::shared_mutex` with
    `CAPABILITY`/`ACQUIRE`/`RELEASE`/`ACQUIRE_SHARED`/`RELEASE_SHARED`
    annotations) and `SharedLockGuard` (RAII `SCOPED_CAPABILITY` with
    `ACQUIRE_SHARED`/`RELEASE`) in `thread_safety_annotations.h`.
    - Migrate `VDataStreamMgr` and `RuntimeFilterMergeControllerEntity` from
    `std::shared_mutex` to `AnnotatedSharedMutex`, and from
    `std::unique_lock`/`std::shared_lock` to `LockGuard`/`SharedLockGuard`.
    - Add `GUARDED_BY` annotations to the protected maps.
    - Extract `_find_recvr` as a private helper annotated with
    `REQUIRES_SHARED(_lock)`, eliminating the `bool acquire_lock` parameter
    that previously bypassed lock tracking.
---
 be/src/common/thread_safety_annotations.h         | 48 +++++++++++++++++++++++
 be/src/exec/exchange/vdata_stream_mgr.cpp         | 28 +++++++------
 be/src/exec/exchange/vdata_stream_mgr.h           | 14 ++++---
 be/src/exec/runtime_filter/runtime_filter_mgr.cpp | 20 +++++-----
 be/src/exec/runtime_filter/runtime_filter_mgr.h   |  9 ++---
 be/test/exec/pipeline/vdata_stream_recvr_test.cpp |  2 +-
 6 files changed, 87 insertions(+), 34 deletions(-)

diff --git a/be/src/common/thread_safety_annotations.h 
b/be/src/common/thread_safety_annotations.h
index 6cd8d4b0cae..6bbdb8ce654 100644
--- a/be/src/common/thread_safety_annotations.h
+++ b/be/src/common/thread_safety_annotations.h
@@ -22,6 +22,7 @@
 #pragma once
 
 #include <mutex>
+#include <shared_mutex>
 
 #ifdef BE_TEST
 namespace doris {
@@ -93,6 +94,27 @@ private:
     std::mutex _mutex;
 };
 
+// Annotated shared mutex wrapper for use with Clang thread safety analysis.
+// Wraps std::shared_mutex and provides both exclusive and shared capability
+// operations so GUARDED_BY / REQUIRES_SHARED / etc. can reference it.
+class CAPABILITY("mutex") AnnotatedSharedMutex {
+public:
+    void lock() ACQUIRE() { _mutex.lock(); }
+    void unlock() RELEASE() { _mutex.unlock(); }
+    bool try_lock() TRY_ACQUIRE(true) { return _mutex.try_lock(); }
+
+    void lock_shared() ACQUIRE_SHARED() { _mutex.lock_shared(); }
+    void unlock_shared() RELEASE_SHARED() { _mutex.unlock_shared(); }
+    bool try_lock_shared() TRY_ACQUIRE_SHARED(true) { return 
_mutex.try_lock_shared(); }
+
+    // Access the underlying std::shared_mutex (e.g., for 
std::condition_variable_any).
+    // Use with care — this bypasses thread safety annotations.
+    std::shared_mutex& native_handle() { return _mutex; }
+
+private:
+    std::shared_mutex _mutex;
+};
+
 // RAII scoped lock guard annotated for thread safety analysis.
 // In BE_TEST builds, injects a random sleep before acquiring and after
 // releasing the lock to exercise concurrent code paths.
@@ -119,6 +141,32 @@ private:
     MutexType& _mu;
 };
 
+// RAII scoped shared lock guard annotated for thread safety analysis.
+// In BE_TEST builds, injects a random sleep before acquiring and after
+// releasing the lock to exercise concurrent code paths.
+template <typename MutexType>
+class SCOPED_CAPABILITY SharedLockGuard {
+public:
+    explicit SharedLockGuard(MutexType& mu) ACQUIRE_SHARED(mu) : _mu(mu) {
+#ifdef BE_TEST
+        doris::mock_random_sleep();
+#endif
+        _mu.lock_shared();
+    }
+    ~SharedLockGuard() RELEASE() {
+        _mu.unlock_shared();
+#ifdef BE_TEST
+        doris::mock_random_sleep();
+#endif
+    }
+
+    SharedLockGuard(const SharedLockGuard&) = delete;
+    SharedLockGuard& operator=(const SharedLockGuard&) = delete;
+
+private:
+    MutexType& _mu;
+};
+
 // RAII unique lock annotated for thread safety analysis.
 // Supports manual lock/unlock while preserving capability tracking.
 template <typename MutexType>
diff --git a/be/src/exec/exchange/vdata_stream_mgr.cpp 
b/be/src/exec/exchange/vdata_stream_mgr.cpp
index 70d59ea7673..1ba53bc2b1b 100644
--- a/be/src/exec/exchange/vdata_stream_mgr.cpp
+++ b/be/src/exec/exchange/vdata_stream_mgr.cpp
@@ -43,7 +43,7 @@ VDataStreamMgr::~VDataStreamMgr() {
     // It will core during graceful stop.
     auto receivers = std::vector<std::shared_ptr<VDataStreamRecvr>>();
     {
-        std::shared_lock l(_lock);
+        SharedLockGuard l(_lock);
         auto receiver_iterator = _receiver_map.begin();
         while (receiver_iterator != _receiver_map.end()) {
             // Could not call close directly, because during close method, it 
will remove itself
@@ -76,22 +76,16 @@ std::shared_ptr<VDataStreamRecvr> 
VDataStreamMgr::create_recvr(
             this, memory_used_counter, state, fragment_instance_id, 
dest_node_id, num_senders,
             is_merging, profile, data_queue_capacity));
     uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
-    std::unique_lock l(_lock);
+    LockGuard l(_lock);
     _fragment_stream_set.insert(std::make_pair(fragment_instance_id, 
dest_node_id));
     _receiver_map.insert(std::make_pair(hash_value, recvr));
     return recvr;
 }
 
-Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, 
PlanNodeId node_id,
-                                  std::shared_ptr<VDataStreamRecvr>* res, bool 
acquire_lock) {
+Status VDataStreamMgr::_find_recvr(uint32_t hash_value, const TUniqueId& 
fragment_instance_id,
+                                   PlanNodeId node_id, 
std::shared_ptr<VDataStreamRecvr>* res) {
     VLOG_ROW << "looking up fragment_instance_id=" << 
print_id(fragment_instance_id)
              << ", node=" << node_id;
-    uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
-    // Create lock guard and not own lock currently and will lock conditionally
-    std::shared_lock recvr_lock(_lock, std::defer_lock);
-    if (acquire_lock) {
-        recvr_lock.lock();
-    }
     std::pair<StreamMap::iterator, StreamMap::iterator> range =
             _receiver_map.equal_range(hash_value);
     while (range.first != range.second) {
@@ -107,6 +101,13 @@ Status VDataStreamMgr::find_recvr(const TUniqueId& 
fragment_instance_id, PlanNod
                                    node_id, print_id(fragment_instance_id));
 }
 
+Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, 
PlanNodeId node_id,
+                                  std::shared_ptr<VDataStreamRecvr>* res) {
+    SharedLockGuard recvr_lock(_lock);
+    uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
+    return _find_recvr(hash_value, fragment_instance_id, node_id, res);
+}
+
 Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
                                       ::google::protobuf::Closure** done,
                                       const int64_t wait_for_worker) {
@@ -173,7 +174,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& 
fragment_instance_id, P
                << ", node=" << node_id;
     uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
     {
-        std::unique_lock l(_lock);
+        LockGuard l(_lock);
         auto range = _receiver_map.equal_range(hash_value);
         while (range.first != range.second) {
             const std::shared_ptr<VDataStreamRecvr>& recvr = 
range.first->second;
@@ -204,12 +205,13 @@ void VDataStreamMgr::cancel(const TUniqueId& 
fragment_instance_id, Status exec_s
     VLOG_QUERY << "cancelling all streams for fragment=" << 
print_id(fragment_instance_id);
     std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs;
     {
-        std::shared_lock l(_lock);
+        SharedLockGuard l(_lock);
         FragmentStreamSet::iterator i =
                 
_fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0));
         while (i != _fragment_stream_set.end() && i->first == 
fragment_instance_id) {
             std::shared_ptr<VDataStreamRecvr> recvr;
-            WARN_IF_ERROR(find_recvr(i->first, i->second, &recvr, false), "");
+            uint32_t hash_value = get_hash_value(i->first, i->second);
+            WARN_IF_ERROR(_find_recvr(hash_value, i->first, i->second, 
&recvr), "");
             if (recvr == nullptr) {
                 // keep going but at least log it
                 std::stringstream err;
diff --git a/be/src/exec/exchange/vdata_stream_mgr.h 
b/be/src/exec/exchange/vdata_stream_mgr.h
index 3825aa1f02b..d69758c94fd 100644
--- a/be/src/exec/exchange/vdata_stream_mgr.h
+++ b/be/src/exec/exchange/vdata_stream_mgr.h
@@ -30,6 +30,7 @@
 #include "common/be_mock_util.h"
 #include "common/global_types.h"
 #include "common/status.h"
+#include "common/thread_safety_annotations.h"
 #include "runtime/runtime_profile.h"
 
 namespace google {
@@ -57,8 +58,7 @@ public:
             RuntimeProfile* profile, bool is_merging, size_t 
data_queue_capacity);
 
     MOCK_FUNCTION Status find_recvr(const TUniqueId& fragment_instance_id, 
PlanNodeId node_id,
-                                    std::shared_ptr<VDataStreamRecvr>* res,
-                                    bool acquire_lock = true);
+                                    std::shared_ptr<VDataStreamRecvr>* res);
 
     Status deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId 
node_id);
 
@@ -68,9 +68,9 @@ public:
     void cancel(const TUniqueId& fragment_instance_id, Status exec_status);
 
 private:
-    std::shared_mutex _lock;
+    AnnotatedSharedMutex _lock;
     using StreamMap = std::unordered_multimap<uint32_t, 
std::shared_ptr<VDataStreamRecvr>>;
-    StreamMap _receiver_map;
+    StreamMap _receiver_map GUARDED_BY(_lock);
 
     struct ComparisonOp {
         bool operator()(const std::pair<doris::TUniqueId, PlanNodeId>& a,
@@ -88,7 +88,11 @@ private:
         }
     };
     using FragmentStreamSet = std::set<std::pair<TUniqueId, PlanNodeId>, 
ComparisonOp>;
-    FragmentStreamSet _fragment_stream_set;
+    FragmentStreamSet _fragment_stream_set GUARDED_BY(_lock);
+
+    Status _find_recvr(uint32_t hash_value, const TUniqueId& 
fragment_instance_id,
+                       PlanNodeId node_id, std::shared_ptr<VDataStreamRecvr>* 
res)
+            REQUIRES_SHARED(_lock);
 
     uint32_t get_hash_value(const TUniqueId& fragment_instance_id, PlanNodeId 
node_id);
 };
diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
index 75dbe9a130d..f8e687f09b7 100644
--- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
@@ -189,7 +189,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     auto filter_id = runtime_filter_desc->filter_id;
     GlobalMergeContext* cnt_val;
     {
-        std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
+        LockGuard guard(_filter_map_mutex);
         cnt_val = &_filter_map[filter_id]; // may inplace construct default 
object
     }
 
@@ -239,7 +239,7 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
     auto filter_id = request->filter_id();
     std::map<int, GlobalMergeContext>::iterator iter;
     {
-        std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
+        SharedLockGuard guard(_filter_map_mutex);
         iter = _filter_map.find(filter_id);
         if (iter == _filter_map.end()) {
             return Status::InvalidArgument("unknown filter id {}",
@@ -247,12 +247,12 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
         }
     }
     auto& cnt_val = iter->second;
-    std::unique_lock<std::mutex> l(iter->second.mtx);
+    std::unique_lock<std::mutex> l(cnt_val.mtx);
     // Discard stale-stage runtime filter size requests from old recursive CTE 
rounds.
     // Each round increments the stage counter; only messages matching the 
current stage
     // should be processed. This prevents old PFC's runtime filters from 
corrupting
     // the merge state of the new round's filters.
-    if (request->stage() != iter->second.stage) {
+    if (request->stage() != cnt_val.stage) {
         return Status::OK();
     }
     cnt_val.source_addrs.push_back(request->source_addr());
@@ -273,7 +273,7 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
             }
 
             auto sync_request = std::make_shared<PSyncFilterSizeRequest>();
-            sync_request->set_stage(iter->second.stage);
+            sync_request->set_stage(cnt_val.stage);
 
             auto callback = 
HandleErrorBrpcCallback<PSyncFilterSizeResponse>::create_shared(
                     query_ctx->ignore_runtime_filter_error() ? 
std::weak_ptr<QueryContext> {}
@@ -343,7 +343,7 @@ Status 
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
     auto filter_id = request->filter_id();
     std::map<int, GlobalMergeContext>::iterator iter;
     {
-        std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
+        SharedLockGuard guard(_filter_map_mutex);
         iter = _filter_map.find(filter_id);
         VLOG_ROW << "recv filter id:" << request->filter_id() << " " << 
request->ShortDebugString();
         if (iter == _filter_map.end()) {
@@ -354,9 +354,9 @@ Status 
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
     auto& cnt_val = iter->second;
     bool is_ready = false;
     {
-        std::lock_guard<std::mutex> l(iter->second.mtx);
+        std::lock_guard<std::mutex> l(cnt_val.mtx);
         // Discard stale-stage merge requests from old recursive CTE rounds.
-        if (request->stage() != iter->second.stage) {
+        if (request->stage() != cnt_val.stage) {
             return Status::OK();
         }
         if (cnt_val.merger == nullptr) {
@@ -508,7 +508,7 @@ Status RuntimeFilterMergeControllerEntity::reset_global_rf(
     for (const auto& filter_id : filter_ids) {
         GlobalMergeContext* cnt_val;
         {
-            std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
+            LockGuard guard(_filter_map_mutex);
             cnt_val = &_filter_map[filter_id]; // may inplace construct 
default object
         }
         RETURN_IF_ERROR(cnt_val->reset(query_ctx));
@@ -518,7 +518,7 @@ Status RuntimeFilterMergeControllerEntity::reset_global_rf(
 
 std::string RuntimeFilterMergeControllerEntity::debug_string() {
     std::string result = "RuntimeFilterMergeControllerEntity Info:\n";
-    std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
+    SharedLockGuard guard(_filter_map_mutex);
     for (const auto& [filter_id, ctx] : _filter_map) {
         result += fmt::format("filter_id: {}, stage: {}, {}\n", filter_id, 
ctx.stage,
                               ctx.merger->debug_string());
diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h 
b/be/src/exec/runtime_filter/runtime_filter_mgr.h
index b25d9956ad8..536eb63e152 100644
--- a/be/src/exec/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h
@@ -27,12 +27,11 @@
 #include <map>
 #include <memory>
 #include <mutex>
-#include <shared_mutex>
 #include <unordered_set>
-#include <utility>
 #include <vector>
 
 #include "common/status.h"
+#include "common/thread_safety_annotations.h"
 #include "util/uid_util.h"
 
 namespace butil {
@@ -174,7 +173,7 @@ public:
     std::string debug_string();
 
     bool empty() {
-        std::shared_lock<std::shared_mutex> read_lock(_filter_map_mutex);
+        SharedLockGuard read_lock(_filter_map_mutex);
         return _filter_map.empty();
     }
 
@@ -191,9 +190,9 @@ private:
                               int64_t merge_time, PUniqueId query_id, int 
execution_timeout);
 
     // protect _filter_map
-    std::shared_mutex _filter_map_mutex;
+    AnnotatedSharedMutex _filter_map_mutex;
     std::shared_ptr<MemTracker> _mem_tracker;
 
-    std::map<int, GlobalMergeContext> _filter_map;
+    std::map<int, GlobalMergeContext> _filter_map 
GUARDED_BY(_filter_map_mutex);
 };
 } // namespace doris
diff --git a/be/test/exec/pipeline/vdata_stream_recvr_test.cpp 
b/be/test/exec/pipeline/vdata_stream_recvr_test.cpp
index ab6b03b13c5..f0c4e05c7e6 100644
--- a/be/test/exec/pipeline/vdata_stream_recvr_test.cpp
+++ b/be/test/exec/pipeline/vdata_stream_recvr_test.cpp
@@ -577,7 +577,7 @@ TEST_F(DataStreamRecvrTest, TestRemoteLocalMultiSender) {
 struct MockVDataStreamMgr : public VDataStreamMgr {
     ~MockVDataStreamMgr() override = default;
     Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId 
node_id,
-                      std::shared_ptr<VDataStreamRecvr>* res, bool 
acquire_lock = true) override {
+                      std::shared_ptr<VDataStreamRecvr>* res) override {
         *res = recvr;
         return Status::OK();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to