This is an automated email from the ASF dual-hosted git repository.
Mryange pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 17bbba45a52 [fix](pipeline) avoid data queue sink dependency lost
wakeup (#63055)
17bbba45a52 is described below
commit 17bbba45a52e6e6ace99773930f83dc1569897f7
Author: Mryange <[email protected]>
AuthorDate: Thu May 7 21:03:37 2026 +0800
[fix](pipeline) avoid data queue sink dependency lost wakeup (#63055)
### What problem does this PR solve?
Issue Number: N/A
Problem Summary:
`DataQueueTest.MultiTest` could intermittently hang after DataQueue
moved sink dependency notifications outside the per-sub-queue lock. Root
cause: `SubQueue` queue state and `sink_dependency` state were no longer
serialized by `queue_lock`, so a producer could observe its sink
dependency as blocked even after the queue had already become empty,
leaving no future push/pop to wake it. This patch updates
`sink_dependency->set_ready()` and `sink_dependency->block()` while
holding `queue_lock`, keeping queue occupancy and sink readiness
transitions atomic with respect to each other.
Related PR: https://github.com/apache/doris/pull/62947
---
be/src/exec/operator/data_queue.cpp | 44 ++++++++++++++-----------------------
1 file changed, 17 insertions(+), 27 deletions(-)
diff --git a/be/src/exec/operator/data_queue.cpp
b/be/src/exec/operator/data_queue.cpp
index f460103d2ea..752e972c3e6 100644
--- a/be/src/exec/operator/data_queue.cpp
+++ b/be/src/exec/operator/data_queue.cpp
@@ -29,38 +29,28 @@
namespace doris {
void SubQueue::try_pop(std::unique_ptr<Block>* output_block) {
- bool need_notify_sink_ready = false;
- {
- LockGuard l(queue_lock);
- if (!blocks.empty()) {
- *output_block = std::move(blocks.front());
- blocks.pop_front();
- bytes_in_queue -= (*output_block)->allocated_bytes();
- blocks_in_queue -= 1;
- need_notify_sink_ready = blocks.empty();
+ LockGuard l(queue_lock);
+ if (!blocks.empty()) {
+ *output_block = std::move(blocks.front());
+ blocks.pop_front();
+ bytes_in_queue -= (*output_block)->allocated_bytes();
+ blocks_in_queue -= 1;
+ if (blocks.empty()) {
+ sink_dependency->set_ready();
}
}
- // Notify outside of queue_lock to avoid nested locks.
- if (need_notify_sink_ready) {
- sink_dependency->set_ready();
- }
}
bool SubQueue::try_push(std::unique_ptr<Block> block, std::atomic_uint32_t&
total_counter) {
- bool need_block_sink = false;
- {
- LockGuard l(queue_lock);
- if (is_finished) {
- return false;
- }
- total_counter++;
- bytes_in_queue += block->allocated_bytes();
- blocks.emplace_back(std::move(block));
- blocks_in_queue += 1;
- need_block_sink = (static_cast<int64_t>(blocks.size()) >
max_blocks_in_queue.load());
- }
- // Notify outside of queue_lock to avoid nested locks.
- if (need_block_sink) {
+ LockGuard l(queue_lock);
+ if (is_finished) {
+ return false;
+ }
+ total_counter++;
+ bytes_in_queue += block->allocated_bytes();
+ blocks.emplace_back(std::move(block));
+ blocks_in_queue += 1;
+ if (static_cast<int64_t>(blocks.size()) > max_blocks_in_queue.load()) {
sink_dependency->block();
}
return true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]