This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new a47f436f025 [pick](branch-4.1)pick 62947 63055 63070 to 4.1 (#63297)
a47f436f025 is described below

commit a47f436f0256c4dc8e4eaca106b667477321495a
Author: Mryange <[email protected]>
AuthorDate: Wed May 20 15:37:51 2026 +0800

    [pick](branch-4.1)pick 62947 63055 63070 to 4.1 (#63297)
    
    ### What problem does this PR solve?
    https://github.com/apache/doris/pull/62947
    https://github.com/apache/doris/pull/63055
    https://github.com/apache/doris/pull/63070
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/CMakeLists.txt                                  |   5 +-
 be/src/common/thread_safety_annotations.h          | 171 ++++++++++++
 be/src/exec/operator/analytic_sink_operator.cpp    |   4 +-
 be/src/exec/operator/analytic_source_operator.cpp  |   9 +-
 be/src/exec/operator/data_queue.cpp                | 233 +++++++++-------
 be/src/exec/operator/data_queue.h                  | 111 ++++----
 be/src/exec/operator/exchange_sink_operator.cpp    |   3 +-
 be/src/exec/operator/exchange_sink_operator.h      |   6 +-
 be/src/exec/operator/hashjoin_build_sink.cpp       |   8 +-
 be/src/exec/operator/hashjoin_build_sink.h         |   5 +-
 be/src/exec/operator/multi_cast_data_streamer.cpp  |  19 +-
 be/src/exec/operator/multi_cast_data_streamer.h    |  19 +-
 .../exec/operator/partition_sort_sink_operator.cpp |  10 +-
 .../operator/partition_sort_source_operator.cpp    |   9 +-
 be/src/exec/operator/scan_operator.cpp             |   4 +-
 be/src/exec/operator/scan_operator.h               |   4 +-
 be/src/exec/pipeline/dependency.h                  |  19 +-
 be/test/exec/pipeline/data_queue_test.cpp          | 309 ++++++++++++++++++++-
 .../pipeline/multi_cast_data_streamer_test.cpp     |  17 +-
 19 files changed, 740 insertions(+), 225 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 1de3541d3eb..d9f9c4bce82 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -335,8 +335,9 @@ if (COMPILER_CLANG)
                         -Wunused-template
                         -Wunused-member-function
                         -Wunused-macros
-                        -Wconversion)
-    add_compile_options( -Wno-gnu-statement-expression
+                        -Wconversion
+                        -Wthread-safety)
+    add_compile_options(-Wno-gnu-statement-expression
                         -Wno-implicit-float-conversion
                         -Wno-sign-conversion
                         )
diff --git a/be/src/common/thread_safety_annotations.h 
b/be/src/common/thread_safety_annotations.h
new file mode 100644
index 00000000000..6cd8d4b0cae
--- /dev/null
+++ b/be/src/common/thread_safety_annotations.h
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Thread safety annotation macros and annotated mutex wrappers for
+// Clang's -Wthread-safety static analysis.
+// Reference: https://clang.llvm.org/docs/ThreadSafetyAnalysis.html
+
+#pragma once
+
+#include <mutex>
+
+#ifdef BE_TEST
+namespace doris {
+void mock_random_sleep();
+} // namespace doris
+#endif
+
+// Enable thread safety attributes only with clang.
+// The attributes can be safely erased when compiling with other compilers.
+#if defined(__clang__) && (!defined(SWIG))
+#define THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x))
+#else
+#define THREAD_ANNOTATION_ATTRIBUTE__(x) // no-op
+#endif
+
+#define CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(capability(x))
+
+#define SCOPED_CAPABILITY THREAD_ANNOTATION_ATTRIBUTE__(scoped_lockable)
+
+#define GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x))
+
+#define PT_GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(pt_guarded_by(x))
+
+#define ACQUIRED_BEFORE(...) 
THREAD_ANNOTATION_ATTRIBUTE__(acquired_before(__VA_ARGS__))
+
+#define ACQUIRED_AFTER(...) 
THREAD_ANNOTATION_ATTRIBUTE__(acquired_after(__VA_ARGS__))
+
+#define REQUIRES(...) 
THREAD_ANNOTATION_ATTRIBUTE__(requires_capability(__VA_ARGS__))
+
+#define REQUIRES_SHARED(...) 
THREAD_ANNOTATION_ATTRIBUTE__(requires_shared_capability(__VA_ARGS__))
+
+#define ACQUIRE(...) 
THREAD_ANNOTATION_ATTRIBUTE__(acquire_capability(__VA_ARGS__))
+
+#define ACQUIRE_SHARED(...) 
THREAD_ANNOTATION_ATTRIBUTE__(acquire_shared_capability(__VA_ARGS__))
+
+#define RELEASE(...) 
THREAD_ANNOTATION_ATTRIBUTE__(release_capability(__VA_ARGS__))
+
+#define RELEASE_SHARED(...) 
THREAD_ANNOTATION_ATTRIBUTE__(release_shared_capability(__VA_ARGS__))
+
+#define TRY_ACQUIRE(...) 
THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_capability(__VA_ARGS__))
+
+#define TRY_ACQUIRE_SHARED(...) \
+    THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_shared_capability(__VA_ARGS__))
+
+#define EXCLUDES(...) 
THREAD_ANNOTATION_ATTRIBUTE__(locks_excluded(__VA_ARGS__))
+
+#define ASSERT_CAPABILITY(x) 
THREAD_ANNOTATION_ATTRIBUTE__(assert_capability(x))
+
+#define ASSERT_SHARED_CAPABILITY(x) 
THREAD_ANNOTATION_ATTRIBUTE__(assert_shared_capability(x))
+
+#define RETURN_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(lock_returned(x))
+
+#define NO_THREAD_SAFETY_ANALYSIS 
THREAD_ANNOTATION_ATTRIBUTE__(no_thread_safety_analysis)
+
+// Annotated mutex wrapper for use with Clang thread safety analysis.
+// Wraps std::mutex and provides the CAPABILITY annotation so that
+// GUARDED_BY / REQUIRES / etc. annotations can reference it.
+class CAPABILITY("mutex") AnnotatedMutex {
+public:
+    void lock() ACQUIRE() { _mutex.lock(); }
+    void unlock() RELEASE() { _mutex.unlock(); }
+    bool try_lock() TRY_ACQUIRE(true) { return _mutex.try_lock(); }
+
+    // Access the underlying std::mutex (e.g., for std::condition_variable).
+    // Use with care — this bypasses thread safety annotations.
+    std::mutex& native_handle() { return _mutex; }
+
+private:
+    std::mutex _mutex;
+};
+
+// RAII scoped lock guard annotated for thread safety analysis.
+// In BE_TEST builds, injects a random sleep before acquiring and after
+// releasing the lock to exercise concurrent code paths.
+template <typename MutexType>
+class SCOPED_CAPABILITY LockGuard {
+public:
+    explicit LockGuard(MutexType& mu) ACQUIRE(mu) : _mu(mu) {
+#ifdef BE_TEST
+        doris::mock_random_sleep();
+#endif
+        _mu.lock();
+    }
+    ~LockGuard() RELEASE() {
+        _mu.unlock();
+#ifdef BE_TEST
+        doris::mock_random_sleep();
+#endif
+    }
+
+    LockGuard(const LockGuard&) = delete;
+    LockGuard& operator=(const LockGuard&) = delete;
+
+private:
+    MutexType& _mu;
+};
+
+// RAII unique lock annotated for thread safety analysis.
+// Supports manual lock/unlock while preserving capability tracking.
+template <typename MutexType>
+class SCOPED_CAPABILITY UniqueLock {
+public:
+    explicit UniqueLock(MutexType& mu) ACQUIRE(mu) : _mu(&mu), _locked(true) {
+#ifdef BE_TEST
+        doris::mock_random_sleep();
+#endif
+        _mu->lock();
+    }
+
+    UniqueLock(MutexType& mu, std::adopt_lock_t) REQUIRES(mu) : _mu(&mu), 
_locked(true) {}
+
+    UniqueLock(MutexType& mu, std::defer_lock_t) EXCLUDES(mu) : _mu(&mu), 
_locked(false) {}
+
+    ~UniqueLock() RELEASE() {
+        if (_locked) {
+            _mu->unlock();
+#ifdef BE_TEST
+            doris::mock_random_sleep();
+#endif
+        }
+    }
+
+    void lock() ACQUIRE() {
+#ifdef BE_TEST
+        doris::mock_random_sleep();
+#endif
+        _mu->lock();
+        _locked = true;
+    }
+
+    void unlock() RELEASE() {
+        _mu->unlock();
+        _locked = false;
+#ifdef BE_TEST
+        doris::mock_random_sleep();
+#endif
+    }
+
+    bool owns_lock() const { return _locked; }
+
+    UniqueLock(const UniqueLock&) = delete;
+    UniqueLock& operator=(const UniqueLock&) = delete;
+
+private:
+    MutexType* _mu;
+    bool _locked;
+};
diff --git a/be/src/exec/operator/analytic_sink_operator.cpp 
b/be/src/exec/operator/analytic_sink_operator.cpp
index db5ac2ecd37..7a6b0d659c0 100644
--- a/be/src/exec/operator/analytic_sink_operator.cpp
+++ b/be/src/exec/operator/analytic_sink_operator.cpp
@@ -461,7 +461,7 @@ void AnalyticSinkLocalState::_init_result_columns() {
 void AnalyticSinkLocalState::_refresh_buffer_and_dependency_state(Block* 
block) {
     size_t buffer_size = 0;
     {
-        std::unique_lock<std::mutex> lc(_shared_state->buffer_mutex);
+        LockGuard lc(_shared_state->buffer_mutex);
         _shared_state->blocks_buffer.push(std::move(*block));
         buffer_size = _shared_state->blocks_buffer.size();
     }
@@ -756,7 +756,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, Block* input_bloc
     RETURN_IF_ERROR(_add_input_block(state, input_block));
     RETURN_IF_ERROR(local_state._execute_impl());
     if (local_state._input_eos) {
-        std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
+        LockGuard lc(local_state._shared_state->sink_eos_lock);
         local_state._shared_state->sink_eos = true;
         local_state._dependency->set_ready_to_read(); // ready for source to 
read
     }
diff --git a/be/src/exec/operator/analytic_source_operator.cpp 
b/be/src/exec/operator/analytic_source_operator.cpp
index de385e3dede..efdb3055ff6 100644
--- a/be/src/exec/operator/analytic_source_operator.cpp
+++ b/be/src/exec/operator/analytic_source_operator.cpp
@@ -53,7 +53,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* 
state, Block* output_blo
     output_block->clear_column_data();
     size_t output_rows = 0;
     {
-        std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
+        LockGuard lock(local_state._shared_state->buffer_mutex);
         if (!local_state._shared_state->blocks_buffer.empty()) {
             
local_state._shared_state->blocks_buffer.front().swap(*output_block);
             local_state._shared_state->blocks_buffer.pop();
@@ -61,11 +61,10 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* 
state, Block* output_blo
             //if buffer have no data and sink not eos, block reading and wait 
for signal again
             RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, 
output_block,
                                                      output_block->columns()));
-            if (local_state._shared_state->blocks_buffer.empty() &&
-                !local_state._shared_state->sink_eos) {
+            if (local_state._shared_state->blocks_buffer.empty()) {
                 // add this mutex to check, as in some case maybe is doing 
block(), and the sink is doing set eos.
                 // so have to hold mutex to set block(), avoid to sink have 
set eos and set ready, but here set block() by mistake
-                std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
+                LockGuard lc(local_state._shared_state->sink_eos_lock);
                 if (!local_state._shared_state->sink_eos) {
                     local_state._dependency->block();              // block 
self source
                     local_state._dependency->set_ready_to_write(); // ready 
for sink write
@@ -73,7 +72,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* 
state, Block* output_blo
             }
         } else {
             //iff buffer have no data and sink eos, set eos
-            std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
+            LockGuard lc(local_state._shared_state->sink_eos_lock);
             *eos = local_state._shared_state->sink_eos;
         }
     }
diff --git a/be/src/exec/operator/data_queue.cpp 
b/be/src/exec/operator/data_queue.cpp
index 84ba08f309f..b41f342ba07 100644
--- a/be/src/exec/operator/data_queue.cpp
+++ b/be/src/exec/operator/data_queue.cpp
@@ -20,43 +20,114 @@
 #include <glog/logging.h>
 
 #include <algorithm>
-#include <mutex>
 #include <utility>
 
+#include "common/thread_safety_annotations.h"
 #include "core/block/block.h"
 #include "exec/pipeline/dependency.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
-DataQueue::DataQueue(int child_count)
-        : _queue_blocks_lock(child_count),
-          _queue_blocks(child_count),
-          _free_blocks_lock(child_count),
-          _free_blocks(child_count),
-          _child_count(child_count),
-          _is_finished(child_count),
-          _is_canceled(child_count),
-          _cur_bytes_in_queue(child_count),
-          _cur_blocks_nums_in_queue(child_count),
-          _flag_queue_idx(0) {
-    for (int i = 0; i < child_count; ++i) {
-        _queue_blocks_lock[i].reset(new std::mutex());
-        _free_blocks_lock[i].reset(new std::mutex());
-        _is_finished[i] = false;
-        _is_canceled[i] = false;
-        _cur_bytes_in_queue[i] = 0;
-        _cur_blocks_nums_in_queue[i] = 0;
+
+void SubQueue::try_pop(std::unique_ptr<Block>* output_block) {
+    LockGuard l(queue_lock);
+    if (!blocks.empty()) {
+        *output_block = std::move(blocks.front());
+        blocks.pop_front();
+        bytes_in_queue -= (*output_block)->allocated_bytes();
+        blocks_in_queue -= 1;
+        if (blocks.empty()) {
+            sink_dependency->set_ready();
+        }
+    }
+}
+
+bool SubQueue::try_push(std::unique_ptr<Block> block, std::atomic_uint32_t& 
total_counter) {
+    LockGuard l(queue_lock);
+    if (is_finished) {
+        return false;
+    }
+    total_counter++;
+    bytes_in_queue += block->allocated_bytes();
+    blocks.emplace_back(std::move(block));
+    blocks_in_queue += 1;
+    if (static_cast<int64_t>(blocks.size()) > max_blocks_in_queue.load()) {
+        sink_dependency->block();
+    }
+    return true;
+}
+
+bool SubQueue::mark_finished(std::atomic_uint32_t& unfinished_counter,
+                             std::atomic_bool& all_finished) {
+    LockGuard l(queue_lock);
+    if (is_finished) {
+        return false;
+    }
+    is_finished = true;
+    if (unfinished_counter.fetch_sub(1) == 1) {
+        all_finished = true;
+    }
+    return true;
+}
+
+void SubQueue::clear_blocks() {
+    bool need_set_always_ready = false;
+    {
+        LockGuard l(queue_lock);
+        if (!blocks.empty()) {
+            blocks.clear();
+            bytes_in_queue = 0;
+            blocks_in_queue = 0;
+            need_set_always_ready = true;
+        }
+    }
+    // Notify outside of queue_lock to keep lock ordering simple.
+    if (need_set_always_ready) {
+        sink_dependency->set_always_ready();
+    }
+}
+
+DataQueue::DataQueue(int child_count) : _sub_queues(child_count), 
_child_count(child_count) {
+    for (auto& sub : _sub_queues) {
+        sub = std::make_unique<SubQueue>();
     }
     _un_finished_counter = child_count;
-    _sink_dependencies.resize(child_count, nullptr);
+}
+
+bool DataQueue::has_more_data() const {
+    return _cur_blocks_total_nums.load() > 0;
+}
+
+void DataQueue::set_source_dependency(std::shared_ptr<Dependency> 
source_dependency)
+        NO_THREAD_SAFETY_ANALYSIS {
+    _source_dependency = std::move(source_dependency);
+}
+
+void DataQueue::set_sink_dependency(Dependency* sink_dependency, int 
child_idx) {
+    _sub_queues[child_idx]->sink_dependency = sink_dependency;
+}
+
+void DataQueue::set_max_blocks_in_sub_queue(int64_t max_blocks) {
+    for (auto& sub : _sub_queues) {
+        sub->max_blocks_in_queue = max_blocks;
+    }
+}
+
+void DataQueue::set_low_memory_mode() {
+    _is_low_memory_mode = true;
+    for (auto& sub : _sub_queues) {
+        sub->max_blocks_in_queue = 1;
+    }
+    clear_free_blocks();
 }
 
 std::unique_ptr<Block> DataQueue::get_free_block(int child_idx) {
+    auto& sub = *_sub_queues[child_idx];
     {
-        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> 
l(*_free_blocks_lock[child_idx]));
-        if (!_free_blocks[child_idx].empty()) {
-            auto block = std::move(_free_blocks[child_idx].front());
-            _free_blocks[child_idx].pop_front();
+        LockGuard l(sub.free_lock);
+        if (!sub.free_blocks.empty()) {
+            auto block = std::move(sub.free_blocks.front());
+            sub.free_blocks.pop_front();
             return block;
         }
     }
@@ -68,29 +139,24 @@ void DataQueue::push_free_block(std::unique_ptr<Block> 
block, int child_idx) {
     DCHECK(block->rows() == 0);
 
     if (!_is_low_memory_mode) {
-        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> 
l(*_free_blocks_lock[child_idx]));
-        _free_blocks[child_idx].emplace_back(std::move(block));
+        auto& sub = *_sub_queues[child_idx];
+        LockGuard l(sub.free_lock);
+        sub.free_blocks.emplace_back(std::move(block));
     }
 }
 
 void DataQueue::clear_free_blocks() {
-    for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) {
-        std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
+    for (auto& sub : _sub_queues) {
+        LockGuard l(sub->free_lock);
         std::deque<std::unique_ptr<Block>> tmp_queue;
-        _free_blocks[child_idx].swap(tmp_queue);
+        sub->free_blocks.swap(tmp_queue);
     }
 }
 
 void DataQueue::terminate() {
-    for (int i = 0; i < _queue_blocks.size(); i++) {
+    for (int i = 0; i < _child_count; ++i) {
         set_finish(i);
-        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> 
l(*_queue_blocks_lock[i]));
-        if (_cur_blocks_nums_in_queue[i] > 0) {
-            _queue_blocks[i].clear();
-            _cur_bytes_in_queue[i] = 0;
-            _cur_blocks_nums_in_queue[i] = 0;
-            _sink_dependencies[i]->set_always_ready();
-        }
+        _sub_queues[i]->clear_blocks();
     }
     clear_free_blocks();
 }
@@ -105,7 +171,7 @@ bool DataQueue::remaining_has_data() {
         if (_flag_queue_idx == _child_count) {
             _flag_queue_idx = 0;
         }
-        if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
+        if (_sub_queues[_flag_queue_idx]->blocks_in_queue.load() > 0) {
             return true;
         }
     }
@@ -115,28 +181,17 @@ bool DataQueue::remaining_has_data() {
 //the _flag_queue_idx indicate which queue has data, and in check can_read
 //will be set idx in remaining_has_data function
 Status DataQueue::get_block_from_queue(std::unique_ptr<Block>* output_block, 
int* child_idx) {
-    if (_is_canceled[_flag_queue_idx]) {
-        return Status::InternalError("Current queue of idx {} have beed 
canceled: ",
-                                     _flag_queue_idx);
-    }
+    const int idx = _flag_queue_idx;
+    auto& sub = *_sub_queues[idx];
 
-    {
-        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> 
l(*_queue_blocks_lock[_flag_queue_idx]));
-        if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
-            *output_block = std::move(_queue_blocks[_flag_queue_idx].front());
-            _queue_blocks[_flag_queue_idx].pop_front();
-            if (child_idx) {
-                *child_idx = _flag_queue_idx;
-            }
-            _cur_bytes_in_queue[_flag_queue_idx] -= 
(*output_block)->allocated_bytes();
-            _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
-            if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0) {
-                _sink_dependencies[_flag_queue_idx]->set_ready();
-            }
-            auto old_value = _cur_blocks_total_nums.fetch_sub(1);
-            if (old_value == 1 && _source_dependency) {
-                set_source_block();
-            }
+    sub.try_pop(output_block);
+    if (*output_block) {
+        if (child_idx) {
+            *child_idx = idx;
+        }
+        auto old_total = _cur_blocks_total_nums.fetch_sub(1);
+        if (old_total == 1) {
+            set_source_block();
         }
     }
     return Status::OK();
@@ -146,70 +201,44 @@ Status DataQueue::push_block(std::unique_ptr<Block> 
block, int child_idx) {
     if (!block) {
         return Status::OK();
     }
-    {
-        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> 
l(*_queue_blocks_lock[child_idx]));
-        if (_is_finished[child_idx]) {
-            return Status::EndOfFile("Already finish");
-        }
-        _cur_bytes_in_queue[child_idx] += block->allocated_bytes();
-        _queue_blocks[child_idx].emplace_back(std::move(block));
-        _cur_blocks_nums_in_queue[child_idx] += 1;
-
-        if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue) {
-            _sink_dependencies[child_idx]->block();
-        }
-        _cur_blocks_total_nums++;
-
-        set_source_ready();
+    auto& sub = *_sub_queues[child_idx];
+    // total_counter is incremented inside try_push under queue_lock, only 
when the
+    // block is actually enqueued. This ensures get_block_from_queue() always 
observes
+    // _cur_blocks_total_nums >= 1 when it successfully pops a block, with no 
risk of
+    // underflow or the need for a rollback on failure.
+    if (!sub.try_push(std::move(block), _cur_blocks_total_nums)) {
+        return Status::EndOfFile("SubQueue already finished");
     }
+    set_source_ready();
     return Status::OK();
 }
 
 void DataQueue::set_finish(int child_idx) {
-    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> 
l(*_queue_blocks_lock[child_idx]));
-    if (_is_finished[child_idx]) {
+    auto& sub = *_sub_queues[child_idx];
+    if (!sub.mark_finished(_un_finished_counter, _is_all_finished)) {
         return;
     }
-    _is_finished[child_idx] = true;
-    if (_un_finished_counter.fetch_sub(1) == 1) {
-        _is_all_finished = true;
-    }
-    set_source_ready();
-}
-
-void DataQueue::set_canceled(int child_idx) {
-    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> 
l(*_queue_blocks_lock[child_idx]));
-    DCHECK(!_is_finished[child_idx]);
-    _is_canceled[child_idx] = true;
-    _is_finished[child_idx] = true;
-    if (_un_finished_counter.fetch_sub(1) == 1) {
-        _is_all_finished = true;
-    }
     set_source_ready();
 }
 
-bool DataQueue::is_finish(int child_idx) {
-    return _is_finished[child_idx];
-}
-
 bool DataQueue::is_all_finish() {
     return _is_all_finished;
 }
 
 void DataQueue::set_source_ready() {
+    LockGuard lc(_source_lock);
     if (_source_dependency) {
-        std::unique_lock lc(_source_lock);
         _source_dependency->set_ready();
     }
 }
 
 void DataQueue::set_source_block() {
-    if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
-        std::unique_lock lc(_source_lock);
-        // Performing the judgment twice, attempting to avoid blocking the 
source as much as possible.
-        if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
-            _source_dependency->block();
-        }
+    // Re-check under _source_lock to avoid blocking the source when a 
concurrent push
+    // has already added new blocks (or all children have finished) since we 
observed
+    // the counter drop to zero.
+    LockGuard lc(_source_lock);
+    if (_source_dependency && _cur_blocks_total_nums == 0 && !is_all_finish()) 
{
+        _source_dependency->block();
     }
 }
 
diff --git a/be/src/exec/operator/data_queue.h 
b/be/src/exec/operator/data_queue.h
index a3c34d86a1e..0def08c05c4 100644
--- a/be/src/exec/operator/data_queue.h
+++ b/be/src/exec/operator/data_queue.h
@@ -20,10 +20,10 @@
 #include <cstdint>
 #include <deque>
 #include <memory>
-#include <mutex>
 #include <vector>
 
 #include "common/status.h"
+#include "common/thread_safety_annotations.h"
 #include "core/block/block.h"
 
 namespace doris {
@@ -31,6 +31,52 @@ namespace doris {
 
 class Dependency;
 
+// Per child sub-queue. Groups all parallel state so that the lock/field
+// relationship is explicit and can be checked by clang -Wthread-safety.
+struct SubQueue {
+    // Protects the `blocks` deque and serializes high-level state
+    // transitions (push/pop/finish/cancel) on this sub-queue.
+    AnnotatedMutex queue_lock;
+    std::deque<std::unique_ptr<Block>> blocks GUARDED_BY(queue_lock);
+
+    // The following fields are only accessed while holding queue_lock.
+    int64_t bytes_in_queue GUARDED_BY(queue_lock) = 0;
+    bool is_finished GUARDED_BY(queue_lock) = false;
+
+    // Protects the `free_blocks` deque only.
+    AnnotatedMutex free_lock;
+    std::deque<std::unique_ptr<Block>> free_blocks GUARDED_BY(free_lock);
+
+    // blocks_in_queue is readable from lock-free fast paths 
(remaining_has_data),
+    // so it remains atomic and is intentionally not GUARDED_BY.
+    std::atomic_uint32_t blocks_in_queue {0};
+
+    // Maximum number of blocks allowed in this sub-queue before the sink is 
blocked.
+    // Updated by DataQueue::set_max_blocks_in_sub_queue / set_low_memory_mode.
+    std::atomic_int64_t max_blocks_in_queue {1};
+
+    // Set once during init via set_sink_dependency, then read-only.
+    Dependency* sink_dependency = nullptr;
+
+    // Pop a block under queue_lock.
+    // Notifies sink_dependency->set_ready() (outside the lock) if the queue 
becomes empty.
+    // output_block is null if the queue was empty.
+    void try_pop(std::unique_ptr<Block>* output_block);
+
+    // Push a block under queue_lock and atomically increment total_counter.
+    // Returns false (without incrementing) if already finished.
+    // Calls sink_dependency->block() (outside the lock) if the queue exceeds 
max_blocks_in_queue.
+    bool try_push(std::unique_ptr<Block> block, std::atomic_uint32_t& 
total_counter);
+
+    // Mark this sub-queue finished. Returns false if already finished 
(idempotent).
+    // Decrements unfinished_counter and may set all_finished within 
queue_lock.
+    bool mark_finished(std::atomic_uint32_t& unfinished_counter, 
std::atomic_bool& all_finished);
+
+    // Clear all pending blocks under queue_lock.
+    // Calls sink_dependency->set_always_ready() (outside the lock) if any 
blocks were cleared.
+    void clear_blocks();
+};
+
 class DataQueue {
 public:
     //always one is enough, but in union node it's has more children
@@ -38,64 +84,37 @@ public:
     ~DataQueue() = default;
 
     Status get_block_from_queue(std::unique_ptr<Block>* block, int* child_idx 
= nullptr);
-
     Status push_block(std::unique_ptr<Block> block, int child_idx = 0);
 
     std::unique_ptr<Block> get_free_block(int child_idx = 0);
-
     void push_free_block(std::unique_ptr<Block> output_block, int child_idx = 
0);
 
-    void clear_free_blocks();
-
     void set_finish(int child_idx = 0);
-    void set_canceled(int child_idx = 0); // should set before finish
-    bool is_finish(int child_idx = 0);
     bool is_all_finish();
 
     // This function is not thread safe, should be called in 
Operator::get_block()
     bool remaining_has_data();
+    bool has_more_data() const;
 
-    bool has_more_data() const { return _cur_blocks_total_nums.load() > 0; }
-
-    int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; }
-    int64_t max_size_of_queue() const { return _max_size_of_queue; }
-
-    void set_source_dependency(std::shared_ptr<Dependency> source_dependency) {
-        _source_dependency = source_dependency;
-    }
-    void set_sink_dependency(Dependency* sink_dependency, int child_idx) {
-        _sink_dependencies[child_idx] = sink_dependency;
-    }
-
-    void set_source_ready();
-    void set_source_block();
-
-    void set_max_blocks_in_sub_queue(int64_t max_blocks) { 
_max_blocks_in_sub_queue = max_blocks; }
-
-    void set_low_memory_mode() {
-        _is_low_memory_mode = true;
-        _max_blocks_in_sub_queue = 1;
-        clear_free_blocks();
-    }
+    void set_source_dependency(std::shared_ptr<Dependency> source_dependency)
+            NO_THREAD_SAFETY_ANALYSIS;
+    void set_sink_dependency(Dependency* sink_dependency, int child_idx);
+    void set_max_blocks_in_sub_queue(int64_t max_blocks);
+    void set_low_memory_mode();
 
     void terminate();
 
 private:
-    std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
-    std::vector<std::deque<std::unique_ptr<Block>>> _queue_blocks;
+    void clear_free_blocks();
+    void set_source_ready();
+    void set_source_block();
 
-    std::vector<std::unique_ptr<std::mutex>> _free_blocks_lock;
-    std::vector<std::deque<std::unique_ptr<Block>>> _free_blocks;
+    std::vector<std::unique_ptr<SubQueue>> _sub_queues;
 
     //how many deque will be init, always will be one
     int _child_count = 0;
-    std::vector<std::atomic_bool> _is_finished;
-    std::atomic_uint32_t _un_finished_counter;
+    std::atomic_uint32_t _un_finished_counter = 0;
     std::atomic_bool _is_all_finished = false;
-    std::vector<std::atomic_bool> _is_canceled;
-    // int64_t just for counter of profile
-    std::vector<std::atomic_int64_t> _cur_bytes_in_queue;
-    std::vector<std::atomic_uint32_t> _cur_blocks_nums_in_queue;
     std::atomic_uint32_t _cur_blocks_total_nums = 0;
 
     //this will be indicate which queue has data, it's useful when have many 
queues
@@ -103,17 +122,11 @@ private:
     // only used by streaming agg source operator
 
     std::atomic_bool _is_low_memory_mode = false;
-    std::atomic_int64_t _max_blocks_in_sub_queue = 1;
-
-    //this only use to record the queue[0] for profile
-    int64_t _max_bytes_in_queue = 0;
-    int64_t _max_size_of_queue = 0;
-    static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024L * 1024 * 1024 / 10;
 
-    // data queue is multi sink one source
-    std::shared_ptr<Dependency> _source_dependency = nullptr;
-    std::vector<Dependency*> _sink_dependencies;
-    std::mutex _source_lock;
+    // _source_dependency is written once during initialization 
(set_source_dependency)
+    // and read/used only while holding _source_lock thereafter.
+    std::shared_ptr<Dependency> _source_dependency GUARDED_BY(_source_lock) = 
nullptr;
+    AnnotatedMutex _source_lock;
 };
 
 #include "common/compile_check_end.h"
diff --git a/be/src/exec/operator/exchange_sink_operator.cpp 
b/be/src/exec/operator/exchange_sink_operator.cpp
index ffbe0780d87..14061d610c4 100644
--- a/be/src/exec/operator/exchange_sink_operator.cpp
+++ b/be/src/exec/operator/exchange_sink_operator.cpp
@@ -25,7 +25,6 @@
 #include <algorithm>
 #include <cstdint>
 #include <memory>
-#include <mutex>
 #include <random>
 #include <string>
 
@@ -217,7 +216,7 @@ void ExchangeSinkLocalState::_create_channels() {
 }
 
 void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
-    std::lock_guard<std::mutex> lock(_finished_channels_mutex);
+    LockGuard lock(_finished_channels_mutex);
 
     if (_finished_channels.contains(channel_id)) {
         LOG(WARNING) << "Query: " << print_id(_state->query_id())
diff --git a/be/src/exec/operator/exchange_sink_operator.h 
b/be/src/exec/operator/exchange_sink_operator.h
index 9824e3aaf88..369f03ec6bf 100644
--- a/be/src/exec/operator/exchange_sink_operator.h
+++ b/be/src/exec/operator/exchange_sink_operator.h
@@ -22,9 +22,9 @@
 #include <algorithm>
 #include <atomic>
 #include <memory>
-#include <mutex>
 
 #include "common/status.h"
+#include "common/thread_safety_annotations.h"
 #include "exec/exchange/exchange_writer.h"
 #include "exec/exchange/vdata_stream_sender.h"
 #include "exec/operator/exchange_sink_buffer.h"
@@ -180,8 +180,8 @@ private:
     int _last_local_channel_idx = -1;
 
     std::atomic_int _working_channels_count = 0;
-    std::set<InstanceLoId> _finished_channels;
-    std::mutex _finished_channels_mutex;
+    std::set<InstanceLoId> _finished_channels 
GUARDED_BY(_finished_channels_mutex);
+    AnnotatedMutex _finished_channels_mutex;
 };
 
 class ExchangeSinkOperatorX MOCK_REMOVE(final) : public 
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/exec/operator/hashjoin_build_sink.cpp 
b/be/src/exec/operator/hashjoin_build_sink.cpp
index fa473a1ea2a..d2d7dcb42c5 100644
--- a/be/src/exec/operator/hashjoin_build_sink.cpp
+++ b/be/src/exec/operator/hashjoin_build_sink.cpp
@@ -67,7 +67,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
         _dependency->block();
         _finish_dependency->block();
         {
-            std::lock_guard<std::mutex> guard(p._mutex);
+            LockGuard guard(p._mutex);
             p._finish_dependencies.push_back(_finish_dependency);
         }
     } else {
@@ -243,10 +243,12 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
         }
 
         if (p._use_shared_hash_table) {
-            std::unique_lock lock(p._mutex);
+            LockGuard lock(p._mutex);
             // Only signal non-builder tasks when the builder actually built 
the hash table.
             // When the builder is terminated early, process_build_block() has 
not initialized the
-            // shared hash table or runtime filter wrappers, so non-builders 
must return EOF.
+            // shared hash table or runtime filter wrappers. In that case the 
hash table variant is
+            // still monostate, so signaling non-builders would make them 
enter std::visit on
+            // monostate and crash with "Hash table type mismatch".
             if (!_terminated) {
                 p._signaled = true;
             }
diff --git a/be/src/exec/operator/hashjoin_build_sink.h 
b/be/src/exec/operator/hashjoin_build_sink.h
index c6f492e8df7..8bde64603ce 100644
--- a/be/src/exec/operator/hashjoin_build_sink.h
+++ b/be/src/exec/operator/hashjoin_build_sink.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "common/thread_safety_annotations.h"
 #include "exec/operator/join_build_sink_operator.h"
 #include "exec/operator/operator.h"
 #include "exec/runtime_filter/runtime_filter_producer_helper.h"
@@ -197,8 +198,8 @@ private:
 
     bool _use_shared_hash_table = false;
     std::atomic<bool> _signaled = false;
-    std::mutex _mutex;
-    std::vector<std::shared_ptr<Dependency>> _finish_dependencies;
+    AnnotatedMutex _mutex;
+    std::vector<std::shared_ptr<Dependency>> _finish_dependencies 
GUARDED_BY(_mutex);
     std::map<int, std::shared_ptr<RuntimeFilterWrapper>> _runtime_filters;
 };
 
diff --git a/be/src/exec/operator/multi_cast_data_streamer.cpp 
b/be/src/exec/operator/multi_cast_data_streamer.cpp
index 142f7800338..da3129b305c 100644
--- a/be/src/exec/operator/multi_cast_data_streamer.cpp
+++ b/be/src/exec/operator/multi_cast_data_streamer.cpp
@@ -48,10 +48,11 @@ MultiCastBlock::MultiCastBlock(Block* block, int 
un_finish_copy, size_t mem_size
     block->clear();
 }
 
-Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* 
block, bool* eos) {
+Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* 
block,
+                                   bool* eos) NO_THREAD_SAFETY_ANALYSIS {
     MultiCastBlock* multi_cast_block = nullptr;
     {
-        INJECT_MOCK_SLEEP(std::unique_lock l(_mutex));
+        UniqueLock l(_mutex);
         for (auto it = _spill_readers[sender_idx].begin();
              it != _spill_readers[sender_idx].end();) {
             if ((*it)->all_data_read) {
@@ -93,13 +94,15 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int 
sender_idx, Block* b
             auto spill_func = [this, reader_item, sender_idx]() {
                 Block block;
                 bool spill_eos = false;
+                bool has_cached_blocks = false;
                 size_t read_size = 0;
                 while (!spill_eos) {
                     RETURN_IF_ERROR(reader_item->reader->read(&block, 
&spill_eos));
                     if (!block.empty()) {
-                        std::lock_guard l(_mutex);
+                        LockGuard l(_mutex);
                         read_size += block.allocated_bytes();
                         
_cached_blocks[sender_idx].emplace_back(std::move(block));
+                        has_cached_blocks = true;
                         if (_cached_blocks[sender_idx].size() >= 32 ||
                             read_size > 2 * 1024 * 1024) {
                             break;
@@ -107,7 +110,7 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int 
sender_idx, Block* b
                     }
                 }
 
-                if (spill_eos || !_cached_blocks[sender_idx].empty()) {
+                if (spill_eos || has_cached_blocks) {
                     reader_item->all_data_read = spill_eos;
                     _set_ready_for_read(sender_idx);
                 }
@@ -159,7 +162,7 @@ Status MultiCastDataStreamer::_copy_block(RuntimeState* 
state, int32_t sender_id
         block->get_by_position(i).column = 
block->get_by_position(i).column->clone_resized(rows);
     }
 
-    INJECT_MOCK_SLEEP(std::lock_guard l(_mutex));
+    LockGuard l(_mutex);
     multi_cast_block._un_finish_copy--;
     auto copying_count = _copying_count.fetch_sub(1) - 1;
     if (multi_cast_block._un_finish_copy == 0) {
@@ -293,7 +296,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::Block* block, boo
     const auto block_mem_size = block->allocated_bytes();
 
     {
-        INJECT_MOCK_SLEEP(std::lock_guard l(_mutex));
+        LockGuard l(_mutex);
         if (_pending_block) {
             DCHECK_GT(_pending_block->rows(), 0);
             const auto pending_size = _pending_block->allocated_bytes();
@@ -346,7 +349,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::Block* block, boo
         _eos = eos;
     }
 
-    if (_eos) {
+    if (eos) {
         for (auto* read_dep : _dependencies) {
             read_dep->set_always_ready();
         }
@@ -377,7 +380,7 @@ std::string MultiCastDataStreamer::debug_string() {
     size_t pos_at_end_count = 0;
     size_t blocks_count = 0;
     {
-        std::unique_lock l(_mutex);
+        LockGuard l(_mutex);
         blocks_count = _multi_cast_blocks.size();
         for (int32_t i = 0; i != _cast_sender_count; ++i) {
             if (!_dependencies[i]->is_blocked_by()) {
diff --git a/be/src/exec/operator/multi_cast_data_streamer.h 
b/be/src/exec/operator/multi_cast_data_streamer.h
index 773576fb2b9..8d62ca78084 100644
--- a/be/src/exec/operator/multi_cast_data_streamer.h
+++ b/be/src/exec/operator/multi_cast_data_streamer.h
@@ -22,6 +22,7 @@
 #include <string>
 #include <vector>
 
+#include "common/thread_safety_annotations.h"
 #include "core/block/block.h"
 #include "exec/exchange/vdata_stream_sender.h"
 #include "exec/pipeline/dependency.h"
@@ -100,16 +101,16 @@ private:
     Status _copy_block(RuntimeState* state, int32_t sender_idx, Block* block,
                        MultiCastBlock& multi_cast_block);
 
-    Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file);
+    Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file) 
REQUIRES(_mutex);
 
-    Status _trigger_spill_if_need(RuntimeState* state, bool* triggered);
+    Status _trigger_spill_if_need(RuntimeState* state, bool* triggered) 
REQUIRES(_mutex);
 
     RuntimeProfile* _profile = nullptr;
-    std::list<MultiCastBlock> _multi_cast_blocks;
-    std::vector<std::vector<Block>> _cached_blocks;
-    std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read;
-    std::mutex _mutex;
-    bool _eos = false;
+    std::list<MultiCastBlock> _multi_cast_blocks GUARDED_BY(_mutex);
+    std::vector<std::vector<Block>> _cached_blocks GUARDED_BY(_mutex);
+    std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read 
GUARDED_BY(_mutex);
+    AnnotatedMutex _mutex;
+    bool _eos GUARDED_BY(_mutex) = false;
     int _cast_sender_count = 0;
     int _node_id;
     std::atomic_int64_t _cumulative_mem_size = 0;
@@ -120,9 +121,9 @@ private:
     Dependency* _write_dependency;
     std::vector<Dependency*> _dependencies;
 
-    BlockUPtr _pending_block;
+    BlockUPtr _pending_block GUARDED_BY(_mutex);
 
-    std::vector<std::vector<std::shared_ptr<SpillingReader>>> _spill_readers;
+    std::vector<std::vector<std::shared_ptr<SpillingReader>>> _spill_readers 
GUARDED_BY(_mutex);
 
     RuntimeProfile* _sink_operator_profile;
     // operator_profile of each source operator
diff --git a/be/src/exec/operator/partition_sort_sink_operator.cpp 
b/be/src/exec/operator/partition_sort_sink_operator.cpp
index 0f605b70d0d..66ed84d021e 100644
--- a/be/src/exec/operator/partition_sort_sink_operator.cpp
+++ b/be/src/exec/operator/partition_sort_sink_operator.cpp
@@ -128,7 +128,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, Block* input_block,
             if (local_state._is_need_passthrough) {
                 {
                     COUNTER_UPDATE(local_state._passthrough_rows_counter, 
(int64_t)current_rows);
-                    std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
+                    LockGuard lock(local_state._shared_state->buffer_mutex);
                     
local_state._shared_state->blocks_buffer.push(std::move(*input_block));
                     // buffer have data, source could read this.
                     local_state._dependency->set_ready_to_read();
@@ -159,8 +159,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, Block* input_block,
             }
             local_state._value_places[i]->_blocks.clear();
             RETURN_IF_ERROR(sorter->prepare_for_read(false));
-            INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> lc(
-                    local_state._shared_state->prepared_finish_lock));
+            LockGuard lc(local_state._shared_state->prepared_finish_lock);
             sorter->set_prepared_finish();
             // iff one sorter have data, then could set source ready to read
             local_state._dependency->set_ready_to_read();
@@ -171,7 +170,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, Block* input_block,
                     local_state._sorted_partition_input_rows);
         //so all data from child have sink completed
         {
-            std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
+            LockGuard lc(local_state._shared_state->sink_eos_lock);
             local_state._shared_state->sink_eos = true;
             // this ready is also need, as source maybe block by self in some 
case
             local_state._dependency->set_ready_to_read();
@@ -262,8 +261,7 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
                             {
                                 
COUNTER_UPDATE(local_state._passthrough_rows_counter,
                                                (int64_t)(row + 1));
-                                std::lock_guard<std::mutex> lock(
-                                        
local_state._shared_state->buffer_mutex);
+                                LockGuard 
lock(local_state._shared_state->buffer_mutex);
                                 // have emplace (num_rows - row) to hashtable, 
and now have row remaining needed in block;
                                 // set_num_rows(x) retains the range [0, x - 
1], so row + 1 is needed here.
                                 input_block->set_num_rows(row + 1);
diff --git a/be/src/exec/operator/partition_sort_source_operator.cpp 
b/be/src/exec/operator/partition_sort_source_operator.cpp
index eeaf4683c5a..2e3dad1d32a 100644
--- a/be/src/exec/operator/partition_sort_source_operator.cpp
+++ b/be/src/exec/operator/partition_sort_source_operator.cpp
@@ -41,17 +41,16 @@ Status 
PartitionSortSourceOperatorX::get_block(RuntimeState* state, Block* outpu
     output_block->clear_column_data();
     auto get_data_from_blocks_buffer = false;
     {
-        std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
+        LockGuard lock(local_state._shared_state->buffer_mutex);
         get_data_from_blocks_buffer = 
!local_state._shared_state->blocks_buffer.empty();
         if (get_data_from_blocks_buffer) {
             
local_state._shared_state->blocks_buffer.front().swap(*output_block);
             local_state._shared_state->blocks_buffer.pop();
 
-            if (local_state._shared_state->blocks_buffer.empty() &&
-                !local_state._shared_state->sink_eos) {
+            if (local_state._shared_state->blocks_buffer.empty()) {
                 // add this mutex to check, as in some case maybe is doing 
block(), and the sink is doing set eos.
                 // so have to hold mutex to set block(), avoid to sink have 
set eos and set ready, but here set block() by mistake
-                std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
+                LockGuard lc(local_state._shared_state->sink_eos_lock);
                 //if buffer have no data and sink not eos, block reading and 
wait for signal again
                 if (!local_state._shared_state->sink_eos) {
                     local_state._dependency->block();
@@ -94,7 +93,7 @@ Status 
PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state, Block
     if (current_eos) {
         // current sort have eos, so get next idx
         local_state._sort_idx++;
-        std::unique_lock<std::mutex> 
lc(local_state._shared_state->prepared_finish_lock);
+        LockGuard lc(local_state._shared_state->prepared_finish_lock);
         if (local_state._sort_idx < sorter_size &&
             !sorters[local_state._sort_idx]->prepared_finish()) {
             local_state._dependency->block();
diff --git a/be/src/exec/operator/scan_operator.cpp 
b/be/src/exec/operator/scan_operator.cpp
index 3d34d32ceb0..87c02aab2b6 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -75,7 +75,7 @@ bool ScanLocalState<Derived>::should_run_serial() const {
 Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* 
state,
                                                               int& 
arrived_rf_num) {
     // Lock needed because _conjuncts can be accessed concurrently by multiple 
scanner threads
-    std::unique_lock lock(_conjuncts_lock);
+    LockGuard lock(_conjuncts_lock);
     RETURN_IF_ERROR(_helper.try_append_late_arrival_runtime_filter(state, 
_parent->row_descriptor(),
                                                                    
arrived_rf_num, _conjuncts));
     if (state->enable_adjust_conjunct_order_by_cost()) {
@@ -88,7 +88,7 @@ Status 
ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* stat
 
 Status ScanLocalStateBase::clone_conjunct_ctxs(VExprContextSPtrs& 
scanner_conjuncts) {
     // Lock needed because _conjuncts can be accessed concurrently by multiple 
scanner threads
-    std::unique_lock lock(_conjuncts_lock);
+    LockGuard lock(_conjuncts_lock);
     scanner_conjuncts.resize(_conjuncts.size());
     for (size_t i = 0; i != _conjuncts.size(); ++i) {
         RETURN_IF_ERROR(_conjuncts[i]->clone(_state, scanner_conjuncts[i]));
diff --git a/be/src/exec/operator/scan_operator.h 
b/be/src/exec/operator/scan_operator.h
index d6e2407a8d2..cfffa5a50b3 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -18,11 +18,11 @@
 #pragma once
 
 #include <cstdint>
-#include <mutex>
 #include <set>
 #include <string>
 
 #include "common/status.h"
+#include "common/thread_safety_annotations.h"
 #include "core/field.h"
 #include "exec/common/util.hpp"
 #include "exec/operator/operator.h"
@@ -126,7 +126,7 @@ protected:
     RuntimeProfile::Counter* _scan_rows = nullptr;
     RuntimeProfile::Counter* _scan_bytes = nullptr;
 
-    std::mutex _conjuncts_lock;
+    AnnotatedMutex _conjuncts_lock;
     RuntimeFilterConsumerHelper _helper;
     // magic number as seed to generate hash value for condition cache
     uint64_t _condition_cache_digest = 0;
diff --git a/be/src/exec/pipeline/dependency.h 
b/be/src/exec/pipeline/dependency.h
index 28c89b5b990..2c6e3b9aef2 100644
--- a/be/src/exec/pipeline/dependency.h
+++ b/be/src/exec/pipeline/dependency.h
@@ -35,6 +35,7 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "common/thread_safety_annotations.h"
 #include "core/block/block.h"
 #include "core/types.h"
 #include "exec/common/agg_utils.h"
@@ -507,10 +508,10 @@ struct AnalyticSharedState : public BasicSharedState {
 
 public:
     AnalyticSharedState() = default;
-    std::queue<Block> blocks_buffer;
-    std::mutex buffer_mutex;
-    bool sink_eos = false;
-    std::mutex sink_eos_lock;
+    std::queue<Block> blocks_buffer GUARDED_BY(buffer_mutex);
+    AnnotatedMutex buffer_mutex;
+    bool sink_eos GUARDED_BY(sink_eos_lock) = false;
+    AnnotatedMutex sink_eos_lock;
     Arena agg_arena_pool;
 };
 
@@ -598,12 +599,12 @@ struct NestedLoopJoinSharedState : public JoinSharedState 
{
 struct PartitionSortNodeSharedState : public BasicSharedState {
     ENABLE_FACTORY_CREATOR(PartitionSortNodeSharedState)
 public:
-    std::queue<Block> blocks_buffer;
-    std::mutex buffer_mutex;
+    std::queue<Block> blocks_buffer GUARDED_BY(buffer_mutex);
+    AnnotatedMutex buffer_mutex;
     std::vector<std::unique_ptr<PartitionSorter>> partition_sorts;
-    bool sink_eos = false;
-    std::mutex sink_eos_lock;
-    std::mutex prepared_finish_lock;
+    bool sink_eos GUARDED_BY(sink_eos_lock) = false;
+    AnnotatedMutex sink_eos_lock;
+    AnnotatedMutex prepared_finish_lock;
 };
 
 struct SetSharedState : public BasicSharedState {
diff --git a/be/test/exec/pipeline/data_queue_test.cpp 
b/be/test/exec/pipeline/data_queue_test.cpp
index 07ef7723632..f8ba26bc664 100644
--- a/be/test/exec/pipeline/data_queue_test.cpp
+++ b/be/test/exec/pipeline/data_queue_test.cpp
@@ -20,6 +20,7 @@
 #include <gtest/gtest.h>
 
 #include <memory>
+#include <thread>
 #include <vector>
 
 #include "core/data_type/data_type_number.h"
@@ -27,6 +28,173 @@
 
 namespace doris {
 
+// ---------------------------------------------------------------------------
+// Helpers
+// ---------------------------------------------------------------------------
+
+static std::unique_ptr<Block> make_block(size_t rows = 1) {
+    auto block = std::make_unique<Block>();
+    auto col = ColumnUInt8::create(rows);
+    block->insert(ColumnWithTypeAndName {std::move(col), 
std::make_shared<DataTypeUInt8>(), ""});
+    return block;
+}
+
+// Create a Dependency that starts ready.
+static std::shared_ptr<Dependency> make_dep(bool initially_ready = true) {
+    return Dependency::create_shared(1, 1, "Test", initially_ready);
+}
+
+// ---------------------------------------------------------------------------
+// SubQueue tests
+// ---------------------------------------------------------------------------
+
+class SubQueueTest : public testing::Test {
+public:
+    void SetUp() override {
+        dep = make_dep(/*initially_ready=*/true);
+        sub = std::make_unique<SubQueue>();
+        sub->sink_dependency = dep.get();
+        sub->max_blocks_in_queue = 2;
+    }
+
+    std::shared_ptr<Dependency> dep;
+    std::unique_ptr<SubQueue> sub;
+    std::atomic_uint32_t counter_ {0};
+};
+
+// Pop from an empty queue returns OK with null output.
+TEST_F(SubQueueTest, TryPopEmpty) {
+    std::unique_ptr<Block> out;
+    sub->try_pop(&out);
+    EXPECT_EQ(out, nullptr);
+    EXPECT_EQ(sub->blocks_in_queue.load(), 0u);
+}
+
+// Basic push then pop returns the block.
+TEST_F(SubQueueTest, TryPushPop_Basic) {
+    EXPECT_TRUE(sub->try_push(make_block(), counter_));
+    EXPECT_EQ(sub->blocks_in_queue.load(), 1u);
+
+    std::unique_ptr<Block> out;
+    sub->try_pop(&out);
+    EXPECT_NE(out, nullptr);
+    EXPECT_EQ(sub->blocks_in_queue.load(), 0u);
+}
+
+// push after mark_finished returns EndOfFile.
+TEST_F(SubQueueTest, TryPushAfterFinished) {
+    std::atomic_uint32_t counter {1};
+    std::atomic_bool all_done {false};
+    sub->mark_finished(counter, all_done);
+
+    EXPECT_FALSE(sub->try_push(make_block(), counter_));
+}
+
+// When blocks.size() exceeds max_blocks_in_queue, sink is blocked.
+TEST_F(SubQueueTest, SinkBlockedWhenFull) {
+    sub->max_blocks_in_queue = 2;
+    dep->set_ready(); // start ready
+
+    // Push up to the limit — sink should stay ready.
+    EXPECT_TRUE(sub->try_push(make_block(), counter_));
+    EXPECT_TRUE(sub->try_push(make_block(), counter_));
+    EXPECT_TRUE(dep->ready());
+
+    // Push one over the limit — sink should be blocked.
+    EXPECT_TRUE(sub->try_push(make_block(), counter_));
+    EXPECT_FALSE(dep->ready());
+}
+
+// Sink wakes up only when the queue becomes completely empty.
+TEST_F(SubQueueTest, SinkReadyWhenQueueEmpty) {
+    sub->max_blocks_in_queue = 2;
+
+    // Fill to 3 (one over limit) → sink blocked.
+    EXPECT_TRUE(sub->try_push(make_block(), counter_));
+    EXPECT_TRUE(sub->try_push(make_block(), counter_));
+    EXPECT_TRUE(sub->try_push(make_block(), counter_));
+    EXPECT_FALSE(dep->ready());
+
+    // Pop 1 & 2: queue not empty yet → sink still blocked.
+    std::unique_ptr<Block> out;
+    sub->try_pop(&out);
+    EXPECT_NE(out, nullptr);
+    EXPECT_FALSE(dep->ready());
+
+    sub->try_pop(&out);
+    EXPECT_NE(out, nullptr);
+    EXPECT_FALSE(dep->ready());
+
+    // Pop 3: queue empty → set_ready().
+    sub->try_pop(&out);
+    EXPECT_NE(out, nullptr);
+    EXPECT_TRUE(dep->ready());
+}
+
+// mark_finished is idempotent: second call returns false and counter stays 
correct.
+TEST_F(SubQueueTest, MarkFinishedIdempotent) {
+    std::atomic_uint32_t counter {2};
+    std::atomic_bool all_done {false};
+
+    EXPECT_TRUE(sub->mark_finished(counter, all_done));
+    EXPECT_EQ(counter.load(), 1u);
+    EXPECT_FALSE(all_done.load());
+
+    EXPECT_FALSE(sub->mark_finished(counter, all_done));
+    EXPECT_EQ(counter.load(), 1u); // unchanged
+}
+
+// mark_finished sets all_finished when last child finishes.
+TEST_F(SubQueueTest, MarkFinishedSetsAllFinished) {
+    std::atomic_uint32_t counter {1};
+    std::atomic_bool all_done {false};
+    sub->mark_finished(counter, all_done);
+    EXPECT_TRUE(all_done.load());
+}
+
+// clear_blocks empties the queue and calls set_always_ready on sink.
+TEST_F(SubQueueTest, ClearBlocksEmptiesQueue) {
+    EXPECT_TRUE(sub->try_push(make_block(), counter_));
+    EXPECT_TRUE(sub->try_push(make_block(), counter_));
+    EXPECT_EQ(sub->blocks_in_queue.load(), 2u);
+
+    sub->clear_blocks();
+
+    EXPECT_EQ(sub->blocks_in_queue.load(), 0u);
+    // set_always_ready was called → dep is always ready.
+    EXPECT_TRUE(dep->ready());
+}
+
+// clear_blocks on an empty queue is a no-op (set_always_ready not called).
+TEST_F(SubQueueTest, ClearBlocksNoop) {
+    dep->block(); // start blocked
+    sub->clear_blocks();
+    EXPECT_FALSE(dep->ready()); // still blocked — clear_blocks did nothing
+}
+
+// bytes_in_queue tracks push/pop correctly.
+TEST_F(SubQueueTest, BytesInQueueTracking) {
+    auto block = make_block(10);
+    int64_t expected_bytes = block->allocated_bytes();
+
+    EXPECT_TRUE(sub->try_push(std::move(block), counter_));
+    {
+        LockGuard l(sub->queue_lock);
+        EXPECT_EQ(sub->bytes_in_queue, expected_bytes);
+    }
+
+    std::unique_ptr<Block> out;
+    sub->try_pop(&out);
+    {
+        LockGuard l(sub->queue_lock);
+        EXPECT_EQ(sub->bytes_in_queue, 0);
+    }
+}
+
+// ---------------------------------------------------------------------------
+// DataQueue fixtures
+// ---------------------------------------------------------------------------
+
 class DataQueueTest : public testing::Test {
 public:
     DataQueueTest() = default;
@@ -47,6 +215,140 @@ public:
     const int child_count = 3;
 };
 
+// ---------------------------------------------------------------------------
+// DataQueue unit tests
+// ---------------------------------------------------------------------------
+
+// Initial state: no data, no finish.
+TEST_F(DataQueueTest, InitialState) {
+    EXPECT_FALSE(data_queue->has_more_data());
+    EXPECT_FALSE(data_queue->is_all_finish());
+    EXPECT_FALSE(data_queue->remaining_has_data());
+}
+
+// Push one block and retrieve it.
+TEST_F(DataQueueTest, SinglePushPop) {
+    EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
+    EXPECT_TRUE(data_queue->has_more_data());
+
+    // Find the queue with data.
+    EXPECT_TRUE(data_queue->remaining_has_data());
+
+    std::unique_ptr<Block> out;
+    int child_idx = -1;
+    EXPECT_TRUE(data_queue->get_block_from_queue(&out, &child_idx).ok());
+    EXPECT_NE(out, nullptr);
+    EXPECT_EQ(child_idx, 0);
+    EXPECT_FALSE(data_queue->has_more_data());
+}
+
+// is_all_finish only becomes true after all children call set_finish.
+TEST_F(DataQueueTest, IsAllFinishAfterAllChildren) {
+    data_queue->set_finish(0);
+    EXPECT_FALSE(data_queue->is_all_finish());
+    data_queue->set_finish(1);
+    EXPECT_FALSE(data_queue->is_all_finish());
+    data_queue->set_finish(2);
+    EXPECT_TRUE(data_queue->is_all_finish());
+}
+
+// set_finish is idempotent.
+TEST_F(DataQueueTest, SetFinishIdempotent) {
+    data_queue->set_finish(0);
+    data_queue->set_finish(0); // second call must not double-decrement
+    data_queue->set_finish(1);
+    data_queue->set_finish(2);
+    EXPECT_TRUE(data_queue->is_all_finish());
+}
+
+// child_idx returned by get_block_from_queue reflects the actual queue.
+TEST_F(DataQueueTest, ChildIdxReturned) {
+    // Push to child 1 only.
+    EXPECT_TRUE(data_queue->push_block(make_block(), 1).ok());
+    data_queue->remaining_has_data(); // advance _flag_queue_idx to find child 
1
+
+    std::unique_ptr<Block> out;
+    int child_idx = -1;
+    EXPECT_TRUE(data_queue->get_block_from_queue(&out, &child_idx).ok());
+    EXPECT_NE(out, nullptr);
+    EXPECT_EQ(child_idx, 1);
+}
+
+// get_free_block returns a new block when free list is empty, reuses when not.
+TEST_F(DataQueueTest, FreeBlockReuse) {
+    // First call: allocates a new block.
+    auto block = data_queue->get_free_block(0);
+    EXPECT_NE(block, nullptr);
+
+    // Return it to the free list.
+    block->clear(); // ensure rows == 0
+    data_queue->push_free_block(std::move(block), 0);
+
+    // Second call: must return the recycled block.
+    auto block2 = data_queue->get_free_block(0);
+    EXPECT_NE(block2, nullptr);
+}
+
+// In low-memory mode push_free_block discards blocks and max drops to 1.
+TEST_F(DataQueueTest, LowMemoryMode) {
+    // Pre-populate the free list.
+    data_queue->push_free_block(Block::create_unique(), 0);
+
+    data_queue->set_low_memory_mode();
+
+    // Free list must be cleared.
+    auto block = data_queue->get_free_block(0);
+    // The free list is empty → a fresh block is returned (not from the list).
+    EXPECT_NE(block, nullptr);
+
+    // push_free_block now discards.
+    block->clear();
+    data_queue->push_free_block(std::move(block), 0);
+    auto block2 = data_queue->get_free_block(0);
+    // Still gets a fresh allocation (free list stays empty).
+    EXPECT_NE(block2, nullptr);
+}
+
+// terminate() finishes all children and clears pending blocks from sub-queues.
+TEST_F(DataQueueTest, Terminate) {
+    EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
+    EXPECT_TRUE(data_queue->push_block(make_block(), 1).ok());
+
+    data_queue->terminate();
+
+    EXPECT_TRUE(data_queue->is_all_finish());
+    // remaining_has_data() checks blocks_in_queue per sub-queue,
+    // which clear_blocks() resets to 0.
+    EXPECT_FALSE(data_queue->remaining_has_data());
+}
+
+// set_max_blocks_in_sub_queue propagates to every sub-queue.
+TEST_F(DataQueueTest, SetMaxBlocksInSubQueue) {
+    data_queue->set_max_blocks_in_sub_queue(5);
+    // Push 5 blocks to child 0 — sink must stay ready (not over the limit 
yet).
+    for (int i = 0; i < 5; i++) {
+        EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
+    }
+    EXPECT_TRUE(sink_deps[0]->ready());
+
+    // 6th push exceeds limit → sink blocked.
+    EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
+    EXPECT_FALSE(sink_deps[0]->ready());
+}
+
+// Source dependency is notified ready when a block is pushed.
+TEST_F(DataQueueTest, SourceReadyOnPush) {
+    source_dep->block(); // start blocked
+    EXPECT_FALSE(source_dep->ready());
+
+    EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
+    EXPECT_TRUE(source_dep->ready());
+}
+
+// ---------------------------------------------------------------------------
+// Multi-threaded integration test (existing)
+// ---------------------------------------------------------------------------
+
 TEST_F(DataQueueTest, MultiTest) {
     int output_count = 0;
     auto output_func = [&]() {
@@ -107,14 +409,7 @@ TEST_F(DataQueueTest, MultiTest) {
     output1.join();
 
     EXPECT_EQ(output_count, 150);
-    for (int i = 0; i < 3; i++) {
-        EXPECT_TRUE(data_queue->is_finish(i));
-    }
     EXPECT_TRUE(data_queue->is_all_finish());
-    data_queue->clear_free_blocks();
-    for (int i = 0; i < 3; i++) {
-        EXPECT_TRUE(data_queue->_free_blocks[i].empty());
-    }
 }
 
 // ./run-be-ut.sh --run --filter=DataQueueTest.*
diff --git a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp 
b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
index d50523605d0..61589327909 100644
--- a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
+++ b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
@@ -316,13 +316,16 @@ TEST_F(MultiCastDataStreamerTest, SpillTest) {
     output2.join();
     output3.join();
 
-    ASSERT_EQ(multi_cast_data_streamer->_multi_cast_blocks.size(), 0);
-    ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[0].size(), 0);
-    ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[1].size(), 0);
-    ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[2].size(), 0);
-    ASSERT_EQ(multi_cast_data_streamer->_spill_readers[0].size(), 0);
-    ASSERT_EQ(multi_cast_data_streamer->_spill_readers[1].size(), 0);
-    ASSERT_EQ(multi_cast_data_streamer->_spill_readers[2].size(), 0);
+    {
+        LockGuard l(multi_cast_data_streamer->_mutex);
+        ASSERT_EQ(multi_cast_data_streamer->_multi_cast_blocks.size(), 0);
+        ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[0].size(), 0);
+        ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[1].size(), 0);
+        ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[2].size(), 0);
+        ASSERT_EQ(multi_cast_data_streamer->_spill_readers[0].size(), 0);
+        ASSERT_EQ(multi_cast_data_streamer->_spill_readers[1].size(), 0);
+        ASSERT_EQ(multi_cast_data_streamer->_spill_readers[2].size(), 0);
+    }
 
     auto debug_string = multi_cast_data_streamer->debug_string();
     EXPECT_TRUE(debug_string.find("MemSize:") != std::string::npos);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to