This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 96a0b60933 [fix](local exchange) fix bug of accessing released counter
of local data stream receiver (#24160)
96a0b60933 is described below
commit 96a0b609338e7a3e7e547f01152a6b4356ac85b6
Author: TengJianPing <[email protected]>
AuthorDate: Mon Sep 11 10:33:38 2023 +0800
[fix](local exchange) fix bug of accessing released counter of local data
stream receiver (#24160)
---
be/src/vec/runtime/vdata_stream_recvr.cpp | 12 ++++++++----
be/src/vec/runtime/vdata_stream_recvr.h | 12 ++++++++++--
2 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 2e0ffaf9a3..fcd7d014c6 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -92,7 +92,7 @@ Status
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
DCHECK(!_block_queue.empty());
auto [next_block, block_byte_size] = std::move(_block_queue.front());
- _recvr->_blocks_memory_usage->add(-block_byte_size);
+ _recvr->update_blocks_memory_usage(-block_byte_size);
_block_queue.pop_front();
if (!_pending_closures.empty()) {
@@ -168,7 +168,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
_pending_closures.emplace_back(*done, monotonicStopWatch);
*done = nullptr;
}
- _recvr->_blocks_memory_usage->add(block_byte_size);
+ _recvr->update_blocks_memory_usage(block_byte_size);
_data_arrival_cv.notify_one();
}
@@ -208,7 +208,12 @@ void VDataStreamRecvr::SenderQueue::add_block(Block*
block, bool use_move) {
_block_queue.emplace_back(std::move(nblock), block_mem_size);
_data_arrival_cv.notify_one();
- if (_recvr->exceeds_limit(block_mem_size)) {
+ // Careful: Accessing members of _recvr that are allocated by Object pool
+ // should be done before the following logic, because the _lock will be
released
+ // by `iter->second->wait(l)`, after `iter->second->wait(l)` returns,
_recvr may
+ // have been closed and resouces in _recvr are released.
+ _recvr->update_blocks_memory_usage(block_mem_size);
+ if (_recvr->exceeds_limit(0)) {
// yiguolei
// It is too tricky here, if the running thread is bthread then the
tid may be wrong.
std::thread::id tid = std::this_thread::get_id();
@@ -223,7 +228,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block,
bool use_move) {
iter->second->wait(l);
}
- _recvr->_blocks_memory_usage->add(block_mem_size);
}
void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 03bf6f9db2..0059c8ddf0 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -102,14 +102,21 @@ public:
void close();
+ // Careful: stream sender will call this function for a local receiver,
+ // accessing members of receiver that are allocated by Object pool
+ // in this function is not safe.
bool exceeds_limit(int batch_size) {
- return _blocks_memory_usage->current_value() + batch_size >
+ return _blocks_memory_usage_current_value + batch_size >
config::exchg_node_buffer_size_bytes;
}
bool is_closed() const { return _is_closed; }
private:
+ void update_blocks_memory_usage(int64_t size) {
+ _blocks_memory_usage->add(size);
+ _blocks_memory_usage_current_value =
_blocks_memory_usage->current_value();
+ }
class SenderQueue;
class PipSenderQueue;
@@ -154,6 +161,7 @@ private:
RuntimeProfile::Counter* _decompress_bytes;
RuntimeProfile::Counter* _memory_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
+ std::atomic<int64_t> _blocks_memory_usage_current_value = 0;
RuntimeProfile::Counter* _peak_memory_usage_counter;
// Number of rows received
@@ -266,7 +274,7 @@ public:
}
_block_queue.emplace_back(std::move(nblock), block_mem_size);
COUNTER_UPDATE(_recvr->_local_bytes_received_counter,
block_mem_size);
- _recvr->_blocks_memory_usage->add(block_mem_size);
+ _recvr->update_blocks_memory_usage(block_mem_size);
_data_arrival_cv.notify_one();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]