This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d5df3bae25 [Bug](exchange) fix dcheck fail when VDataStreamRecvr input
empty block (#22992)
d5df3bae25 is described below
commit d5df3bae25ff30bcb3e5c35ca0f374b6e368bb6f
Author: Pxl <[email protected]>
AuthorDate: Wed Aug 16 10:21:19 2023 +0800
[Bug](exchange) fix dcheck fail when VDataStreamRecvr input empty block
(#22992)
fix dcheck fail when VDataStreamRecvr input empty block
---
.../vec/runtime/shared_hash_table_controller.cpp | 3 +--
be/src/vec/runtime/vdata_stream_recvr.cpp | 22 ++++++++++++++++------
be/src/vec/runtime/vdata_stream_recvr.h | 4 +++-
3 files changed, 20 insertions(+), 9 deletions(-)
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index 9353bdbbec..490a2cfdcd 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -57,8 +57,7 @@ bool SharedHashTableController::should_build_hash_table(const
TUniqueId& fragmen
SharedHashTableContextPtr SharedHashTableController::get_context(int
my_node_id) {
std::lock_guard<std::mutex> lock(_mutex);
- auto it = _shared_contexts.find(my_node_id);
- if (it == _shared_contexts.cend()) {
+ if (!_shared_contexts.count(my_node_id)) {
_shared_contexts.insert({my_node_id,
std::make_shared<SharedHashTableContext>()});
}
return _shared_contexts[my_node_id];
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 2e0ffaf9a3..0c76df5b9a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -159,7 +159,11 @@ void VDataStreamRecvr::SenderQueue::add_block(const
PBlock& pblock, int be_numbe
COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
- _block_queue.emplace_back(std::move(block), block_byte_size);
+ bool empty = !block->rows();
+
+ if (!empty) {
+ _block_queue.emplace_back(std::move(block), block_byte_size);
+ }
// if done is nullptr, this function can't delay this response
if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
MonotonicStopWatch monotonicStopWatch;
@@ -169,7 +173,9 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
*done = nullptr;
}
_recvr->_blocks_memory_usage->add(block_byte_size);
- _data_arrival_cv.notify_one();
+ if (!empty) {
+ _data_arrival_cv.notify_one();
+ }
}
void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
@@ -205,8 +211,12 @@ void VDataStreamRecvr::SenderQueue::add_block(Block*
block, bool use_move) {
COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
- _block_queue.emplace_back(std::move(nblock), block_mem_size);
- _data_arrival_cv.notify_one();
+ bool empty = !nblock->rows();
+
+ if (!empty) {
+ _block_queue.emplace_back(std::move(nblock), block_mem_size);
+ _data_arrival_cv.notify_one();
+ }
if (_recvr->exceeds_limit(block_mem_size)) {
// yiguolei
@@ -384,8 +394,8 @@ bool VDataStreamRecvr::sender_queue_empty(int sender_id) {
}
bool VDataStreamRecvr::ready_to_read() {
- for (size_t i = 0; i < _sender_queues.size(); i++) {
- if (_sender_queues[i]->should_wait()) {
+ for (const auto& queue : _sender_queues) {
+ if (queue->should_wait()) {
return false;
}
}
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 03bf6f9db2..eb57c57b0d 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -237,7 +237,9 @@ public:
}
void add_block(Block* block, bool use_move) override {
- if (block->rows() == 0) return;
+ if (block->rows() == 0) {
+ return;
+ }
{
std::unique_lock<std::mutex> l(_lock);
if (_is_cancelled) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]