This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 04e993c1de8 [refine](pipeline) refine some VDataStreamRecvr code
(#35063) (#37802)
04e993c1de8 is described below
commit 04e993c1de8802a3dbea44710e399c04b6aae5ff
Author: Mryange <[email protected]>
AuthorDate: Thu Aug 22 19:55:17 2024 +0800
[refine](pipeline) refine some VDataStreamRecvr code (#35063) (#37802)
## Proposed changes
https://github.com/apache/doris/pull/35063
https://github.com/apache/doris/pull/35428
---
be/src/vec/runtime/vdata_stream_recvr.cpp | 60 ++++++++++++++++++-------------
be/src/vec/runtime/vdata_stream_recvr.h | 20 +++++------
2 files changed, 46 insertions(+), 34 deletions(-)
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index cb483e986c8..912ecf53989 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -49,6 +49,7 @@ VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr*
parent_recvr, int n
_num_remaining_senders(num_senders),
_received_first_batch(false) {
_cancel_status = Status::OK();
+ _queue_mem_tracker = std::make_unique<MemTracker>("local data queue mem
tracker");
}
VDataStreamRecvr::SenderQueue::~SenderQueue() {
@@ -98,17 +99,14 @@ Status
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
DCHECK(!_block_queue.empty());
auto [next_block, block_byte_size] = std::move(_block_queue.front());
- update_blocks_memory_usage(-block_byte_size);
_block_queue.pop_front();
+ sub_blocks_memory_usage(block_byte_size);
_record_debug_info();
if (_block_queue.empty() && _source_dependency) {
if (!_is_cancelled && _num_remaining_senders > 0) {
_source_dependency->block();
}
}
- if (_local_channel_dependency) {
- _local_channel_dependency->set_ready();
- }
if (!_pending_closures.empty()) {
auto closure_pair = _pending_closures.front();
@@ -136,9 +134,6 @@ void
VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() {
Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int
be_number,
int64_t packet_seq,
::google::protobuf::Closure**
done) {
- const auto pblock_byte_size = pblock.ByteSizeLong();
- COUNTER_UPDATE(_recvr->_bytes_received_counter, pblock_byte_size);
-
{
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
@@ -191,6 +186,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const
PBlock& pblock, int be_num
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
_block_queue.emplace_back(std::move(block), block_byte_size);
+ COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
_record_debug_info();
try_set_dep_ready_without_lock();
@@ -202,7 +198,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const
PBlock& pblock, int be_num
_pending_closures.emplace_back(*done, monotonicStopWatch);
*done = nullptr;
}
- update_blocks_memory_usage(block_byte_size);
+ add_blocks_memory_usage(block_byte_size);
_data_arrival_cv.notify_one();
return Status::OK();
}
@@ -216,7 +212,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block,
bool use_move) {
}
}
- auto block_bytes_received = block->bytes();
// Has to use unique ptr here, because clone column may failed if allocate
memory failed.
BlockUPtr nblock =
Block::create_unique(block->get_columns_with_type_and_name());
@@ -236,11 +231,11 @@ void VDataStreamRecvr::SenderQueue::add_block(Block*
block, bool use_move) {
if (_is_cancelled) {
return;
}
- COUNTER_UPDATE(_recvr->_local_bytes_received_counter,
block_bytes_received);
COUNTER_UPDATE(_recvr->_rows_produced_counter, rows);
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
_block_queue.emplace_back(std::move(nblock), block_mem_size);
+ COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
_record_debug_info();
try_set_dep_ready_without_lock();
_data_arrival_cv.notify_one();
@@ -249,7 +244,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block,
bool use_move) {
// should be done before the following logic, because the _lock will be
released
// by `iter->second->wait(l)`, after `iter->second->wait(l)` returns,
_recvr may
// have been closed and resouces in _recvr are released;
- update_blocks_memory_usage(block_mem_size);
+ add_blocks_memory_usage(block_mem_size);
if (_recvr->exceeds_limit(0)) {
// yiguolei
// It is too tricky here, if the running thread is bthread then the
tid may be wrong.
@@ -347,7 +342,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr*
stream_mgr, RuntimeState* sta
_is_closed(false),
_profile(profile),
_peak_memory_usage_counter(nullptr),
- _enable_pipeline(state->enable_pipeline_exec()) {
+ _enable_pipeline(state->enable_pipeline_x_exec()) {
// DataStreamRecvr may be destructed after the instance execution thread
ends.
_mem_tracker =
std::make_unique<MemTracker>("VDataStreamRecvr:" +
print_id(_fragment_instance_id));
@@ -364,6 +359,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr*
stream_mgr, RuntimeState* sta
}
_sender_queues.reserve(num_queues);
int num_sender_per_queue = is_merging ? 1 : num_senders;
+ _sender_queue_mem_limit = std::max(20480,
config::exchg_node_buffer_size_bytes / num_queues);
for (int i = 0; i < num_queues; ++i) {
SenderQueue* queue = nullptr;
if (_enable_pipeline) {
@@ -379,10 +375,9 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr*
stream_mgr, RuntimeState* sta
// Initialize the counters
_memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
- _blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks",
TUnit::BYTES, "MemoryUsage");
_peak_memory_usage_counter =
- _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES,
"MemoryUsage");
- _bytes_received_counter = ADD_COUNTER(_profile, "BytesReceived",
TUnit::BYTES);
+ _profile->add_counter("PeakMemoryUsage", TUnit::BYTES,
"MemoryUsage");
+ _remote_bytes_received_counter = ADD_COUNTER(_profile,
"RemoteBytesReceived", TUnit::BYTES);
_local_bytes_received_counter = ADD_COUNTER(_profile,
"LocalBytesReceived", TUnit::BYTES);
_deserialize_row_batch_timer = ADD_TIMER(_profile,
"DeserializeRowBatchTimer");
@@ -428,7 +423,6 @@ Status VDataStreamRecvr::add_block(const PBlock& pblock,
int sender_id, int be_n
}
void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
- _mem_tracker->consume(block->allocated_bytes());
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->add_block(block, use_move);
}
@@ -455,7 +449,6 @@ bool VDataStreamRecvr::ready_to_read() {
Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
- Defer release_mem([&]() { _mem_tracker->release(block->allocated_bytes());
});
if (!_is_merging) {
block->clear();
return _sender_queues[0]->get_batch(block, eos);
@@ -482,16 +475,35 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) {
}
}
-void VDataStreamRecvr::SenderQueue::update_blocks_memory_usage(int64_t size) {
- _recvr->update_blocks_memory_usage(size);
- if (_local_channel_dependency && _recvr->exceeds_limit(0)) {
+void VDataStreamRecvr::SenderQueue::add_blocks_memory_usage(int64_t size) {
+ DCHECK(size >= 0);
+ _recvr->_mem_tracker->consume(size);
+ _queue_mem_tracker->consume(size);
+ if (_local_channel_dependency && exceeds_limit()) {
_local_channel_dependency->block();
}
}
-void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
- _blocks_memory_usage->add(size);
- _blocks_memory_usage_current_value.fetch_add(size);
+void VDataStreamRecvr::SenderQueue::sub_blocks_memory_usage(int64_t size) {
+ DCHECK(size >= 0);
+ _recvr->_mem_tracker->release(size);
+ _queue_mem_tracker->release(size);
+ if (_local_channel_dependency && (!exceeds_limit())) {
+ _local_channel_dependency->set_ready();
+ }
+}
+
+bool VDataStreamRecvr::SenderQueue::exceeds_limit() {
+ const size_t queue_byte_size = _queue_mem_tracker->consumption();
+ return _recvr->queue_exceeds_limit(queue_byte_size);
+}
+
+bool VDataStreamRecvr::exceeds_limit(size_t block_byte_size) {
+ return _mem_tracker->consumption() + block_byte_size >
config::exchg_node_buffer_size_bytes;
+}
+
+bool VDataStreamRecvr::queue_exceeds_limit(size_t queue_byte_size) const {
+ return queue_byte_size >= _sender_queue_mem_limit;
}
void VDataStreamRecvr::close() {
@@ -550,7 +562,7 @@ void VDataStreamRecvr::PipSenderQueue::add_block(Block*
block, bool use_move) {
_record_debug_info();
try_set_dep_ready_without_lock();
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
- update_blocks_memory_usage(block_mem_size);
+ add_blocks_memory_usage(block_mem_size);
_data_arrival_cv.notify_one();
}
}
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index cb44565e8c2..d447e5686e9 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -111,17 +111,13 @@ public:
// Careful: stream sender will call this function for a local receiver,
// accessing members of receiver that are allocated by Object pool
// in this function is not safe.
- bool exceeds_limit(int batch_size) {
- return _blocks_memory_usage_current_value + batch_size >
- config::exchg_node_buffer_size_bytes;
- }
-
+ bool exceeds_limit(size_t block_byte_size);
+ bool queue_exceeds_limit(size_t byte_size) const;
bool is_closed() const { return _is_closed; }
std::shared_ptr<pipeline::Dependency> get_local_channel_dependency(int
sender_id);
private:
- void update_blocks_memory_usage(int64_t size);
class PipSenderQueue;
friend struct BlockSupplierSortCursorImpl;
@@ -146,13 +142,14 @@ private:
std::unique_ptr<MemTracker> _mem_tracker;
// Managed by object pool
std::vector<SenderQueue*> _sender_queues;
+ size_t _sender_queue_mem_limit;
std::unique_ptr<VSortedRunMerger> _merger;
ObjectPool _sender_queue_pool;
RuntimeProfile* _profile = nullptr;
- RuntimeProfile::Counter* _bytes_received_counter = nullptr;
+ RuntimeProfile::Counter* _remote_bytes_received_counter = nullptr;
RuntimeProfile::Counter* _local_bytes_received_counter = nullptr;
RuntimeProfile::Counter* _deserialize_row_batch_timer = nullptr;
RuntimeProfile::Counter* _first_batch_wait_total_timer = nullptr;
@@ -161,8 +158,6 @@ private:
RuntimeProfile::Counter* _decompress_timer = nullptr;
RuntimeProfile::Counter* _decompress_bytes = nullptr;
RuntimeProfile::Counter* _memory_usage_counter = nullptr;
- RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;
- std::atomic<int64_t> _blocks_memory_usage_current_value = 0;
RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
// Number of rows received
@@ -222,7 +217,11 @@ public:
_source_dependency = dependency;
}
- void update_blocks_memory_usage(int64_t size);
+ void add_blocks_memory_usage(int64_t size);
+
+ void sub_blocks_memory_usage(int64_t size);
+
+ bool exceeds_limit();
protected:
friend class pipeline::ExchangeLocalState;
@@ -282,6 +281,7 @@ protected:
int _num_remaining_senders;
std::condition_variable _data_arrival_cv;
std::condition_variable _data_removal_cv;
+ std::unique_ptr<MemTracker> _queue_mem_tracker;
std::list<std::pair<BlockUPtr, size_t>> _block_queue;
bool _received_first_batch;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]