This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0896aefce3 [fix](local exchange) fix bug of accesssing released
counter of local data stream receiver (#24148)
0896aefce3 is described below
commit 0896aefce37383856c44789f8aad2711b125e284
Author: TengJianPing <[email protected]>
AuthorDate: Mon Sep 11 09:52:31 2023 +0800
[fix](local exchange) fix bug of accesssing released counter of local data
stream receiver (#24148)
---
be/src/vec/runtime/vdata_stream_recvr.cpp | 13 ++++++++-----
be/src/vec/runtime/vdata_stream_recvr.h | 12 ++++++++++--
2 files changed, 18 insertions(+), 7 deletions(-)
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index cc908d47e0..30588538d7 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -92,7 +92,7 @@ Status
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
DCHECK(!_block_queue.empty());
auto [next_block, block_byte_size] = std::move(_block_queue.front());
- _recvr->_blocks_memory_usage->add(-block_byte_size);
+ _recvr->update_blocks_memory_usage(-block_byte_size);
_block_queue.pop_front();
if (!_pending_closures.empty()) {
@@ -173,7 +173,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const
PBlock& pblock, int be_num
_pending_closures.emplace_back(*done, monotonicStopWatch);
*done = nullptr;
}
- _recvr->_blocks_memory_usage->add(block_byte_size);
+ _recvr->update_blocks_memory_usage(block_byte_size);
if (!empty) {
_data_arrival_cv.notify_one();
}
@@ -220,7 +220,12 @@ void VDataStreamRecvr::SenderQueue::add_block(Block*
block, bool use_move) {
_data_arrival_cv.notify_one();
}
- if (_recvr->exceeds_limit(block_mem_size)) {
+ // Careful: Accessing members of _recvr that are allocated by Object pool
+ // should be done before the following logic, because the _lock will be
released
+ // by `iter->second->wait(l)`, after `iter->second->wait(l)` returns,
_recvr may
+ // have been closed and resouces in _recvr are released;
+ _recvr->update_blocks_memory_usage(block_mem_size);
+ if (_recvr->exceeds_limit(0)) {
// yiguolei
// It is too tricky here, if the running thread is bthread then the
tid may be wrong.
std::thread::id tid = std::this_thread::get_id();
@@ -234,8 +239,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block,
bool use_move) {
_pending_closures.emplace_back(iter->second.get(), monotonicStopWatch);
iter->second->wait(l);
}
-
- _recvr->_blocks_memory_usage->add(block_mem_size);
}
void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index fe9910492b..d79e9ed90a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -102,14 +102,21 @@ public:
void close();
+ // Careful: stream sender will call this function for a local receiver,
+ // accessing members of receiver that are allocated by Object pool
+ // in this function is not safe.
bool exceeds_limit(int batch_size) {
- return _blocks_memory_usage->current_value() + batch_size >
+ return _blocks_memory_usage_current_value + batch_size >
config::exchg_node_buffer_size_bytes;
}
bool is_closed() const { return _is_closed; }
private:
+ void update_blocks_memory_usage(int64_t size) {
+ _blocks_memory_usage->add(size);
+ _blocks_memory_usage_current_value =
_blocks_memory_usage->current_value();
+ }
class SenderQueue;
class PipSenderQueue;
@@ -154,6 +161,7 @@ private:
RuntimeProfile::Counter* _decompress_bytes;
RuntimeProfile::Counter* _memory_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
+ std::atomic<int64_t> _blocks_memory_usage_current_value = 0;
RuntimeProfile::Counter* _peak_memory_usage_counter;
// Number of rows received
@@ -268,7 +276,7 @@ public:
}
_block_queue.emplace_back(std::move(nblock), block_mem_size);
COUNTER_UPDATE(_recvr->_local_bytes_received_counter,
block_mem_size);
- _recvr->_blocks_memory_usage->add(block_mem_size);
+ _recvr->update_blocks_memory_usage(block_mem_size);
_data_arrival_cv.notify_one();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]