This is an automated email from the ASF dual-hosted git repository.
jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 46d3bbd474b [fix](exchange)fix exchange sink buffer does not update
total_queue_size when EOF. (#47312) (#47621)
46d3bbd474b is described below
commit 46d3bbd474b8f72ae30bbfb46599e9fe36cbde96
Author: TengJianPing <[email protected]>
AuthorDate: Sat Feb 8 10:50:41 2025 +0800
[fix](exchange)fix exchange sink buffer does not update total_queue_size
when EOF. (#47312) (#47621)
https://github.com/apache/doris/pull/41602
EOF clears _instance_to_package_queue but does not update
total_queue_size, causing incorrect judgments that rely on
total_queue_size.
UT
```
mock transmit_blockv2 dest ins id :1
mock transmit_blockv2 dest ins id :2
mock transmit_blockv2 dest ins id :3
queue size : 6
each queue size :
Instance: 2, queue size: 2
Instance: 1, queue size: 2
Instance: 3, queue size: 2
queue size : 6 // error size
each queue size :
Instance: 2, queue size: 0
Instance: 1, queue size: 2
Instance: 3, queue size: 2
mock transmit_blockv2 dest ins id :1
mock transmit_blockv2 dest ins id :1
mock transmit_blockv2 dest ins id :3
mock transmit_blockv2 dest ins id :3
```
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
Co-authored-by: Mryange <[email protected]>
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 24 ++++++++++++++-
be/src/pipeline/exec/exchange_sink_buffer.h | 3 ++
be/src/pipeline/exec/exchange_sink_operator.cpp | 13 ++++----
be/test/vec/exec/exchange_sink_test.cpp | 40 +++++++++++++++++++++++++
4 files changed, 73 insertions(+), 7 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 3fb460c9cc7..a776025c676 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -42,6 +42,7 @@
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
+#include "util/defer_op.h"
#include "util/proto_util.h"
#include "util/time.h"
#include "vec/sink/vdata_stream_sender.h"
@@ -445,7 +446,8 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id)
{
// When the receiving side reaches eof, it means the receiver has finished
early.
// The remaining data in the current rpc_channel does not need to be sent,
// and the rpc_channel should be turned off immediately.
- _turn_off_channel(id, lock);
+ Defer turn_off([&]() { _turn_off_channel(id, lock); });
+
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>&
broadcast_q =
_instance_to_broadcast_package_queue[id];
for (; !broadcast_q.empty(); broadcast_q.pop()) {
@@ -461,12 +463,22 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId
id) {
std::queue<TransmitInfo, std::list<TransmitInfo>>& q =
_instance_to_package_queue[id];
for (; !q.empty(); q.pop()) {
+ // Must update _total_queue_size here, otherwise if _total_queue_size
> _queue_capacity at EOF,
+ // ExchangeSinkQueueDependency will be blocked and pipeline will be
deadlocked
+ _total_queue_size--;
if (q.front().block) {
COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(),
-q.front().block->ByteSizeLong());
}
}
+ // Try to wake up pipeline after clearing the queue
+ if (_total_queue_size <= _queue_capacity) {
+ for (auto& [_, dep] : _queue_deps) {
+ dep->set_ready();
+ }
+ }
+
{
std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
swap(empty, q);
@@ -578,6 +590,16 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile*
profile) {
}
}
+std::string ExchangeSinkBuffer::debug_each_instance_queue_size() {
+ fmt::memory_buffer debug_string_buffer;
+ for (auto& [id, m] : _instance_to_package_queue_mutex) {
+ std::unique_lock<std::mutex> lock(*m);
+ fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n",
id,
+ _instance_to_package_queue[id].size());
+ }
+ return fmt::to_string(debug_string_buffer);
+}
+
} // namespace pipeline
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 899a2991110..51698c118cd 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -250,6 +250,7 @@ public:
}
void set_low_memory_mode() { _queue_capacity = 8; }
+ std::string debug_each_instance_queue_size();
#ifdef BE_TEST
public:
#else
@@ -319,6 +320,8 @@ private:
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
int64_t get_sum_rpc_time();
+ // _total_queue_size is the sum of the sizes of all
instance_to_package_queues.
+ // Any modification to instance_to_package_queue requires a corresponding
modification to _total_queue_size.
std::atomic<int> _total_queue_size = 0;
// _running_sink_count is used to track how many sinks have not finished
yet.
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 573f4aa840e..c6ac3b80d88 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -507,12 +507,13 @@ std::string ExchangeSinkLocalState::debug_string(int
indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
if (_sink_buffer) {
- fmt::format_to(debug_string_buffer,
- ", Sink Buffer: (_is_finishing = {}, blocks in queue:
{}, queue capacity: "
- "{}, queue dep: {}), _reach_limit: {}, working
channels: {}",
- _sink_buffer->_is_failed.load(),
_sink_buffer->_total_queue_size,
- _sink_buffer->_queue_capacity,
(void*)_queue_dependency.get(),
- _reach_limit.load(), _working_channels_count.load());
+ fmt::format_to(
+ debug_string_buffer,
+ ", Sink Buffer: (_is_finishing = {}, blocks in queue: {},
queue capacity: "
+ "{}, queue dep: {}), _reach_limit: {}, working channels: {} ,
each queue size: {}",
+ _sink_buffer->_is_failed.load(),
_sink_buffer->_total_queue_size,
+ _sink_buffer->_queue_capacity, (void*)_queue_dependency.get(),
_reach_limit.load(),
+ _working_channels_count.load(),
_sink_buffer->debug_each_instance_queue_size());
}
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/test/vec/exec/exchange_sink_test.cpp
b/be/test/vec/exec/exchange_sink_test.cpp
index 9576ed71ee2..7dbd352bd3a 100644
--- a/be/test/vec/exec/exchange_sink_test.cpp
+++ b/be/test/vec/exec/exchange_sink_test.cpp
@@ -193,4 +193,44 @@ TEST_F(ExchangeSInkTest, test_error_end) {
}
}
+TEST_F(ExchangeSInkTest, test_queue_size) {
+ {
+ auto state = create_runtime_state();
+ auto buffer = create_buffer(state);
+
+ auto sink1 = create_sink(state, buffer);
+
+ EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK());
+ EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK());
+ EXPECT_EQ(sink1.add_block(dest_ins_id_1, false), Status::OK());
+
+ EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK());
+ EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK());
+ EXPECT_EQ(sink1.add_block(dest_ins_id_2, false), Status::OK());
+
+ EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK());
+ EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK());
+ EXPECT_EQ(sink1.add_block(dest_ins_id_3, false), Status::OK());
+
+ std::cout << "queue size : " << buffer->_total_queue_size << "\n";
+
+ EXPECT_EQ(buffer->_total_queue_size, 6);
+
+ std::cout << "each queue size : \n" <<
buffer->debug_each_instance_queue_size() << "\n";
+
+ pop_block(dest_ins_id_2, PopState::eof);
+
+ std::cout << "queue size : " << buffer->_total_queue_size << "\n";
+
+ EXPECT_EQ(buffer->_total_queue_size, 4);
+
+ std::cout << "each queue size : \n" <<
buffer->debug_each_instance_queue_size() << "\n";
+
+ EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], false);
+ EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], true);
+ EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false);
+ clear_all_done();
+ }
+}
+
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]