This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 378b2f21e22 [opt](exec)lazy deserialize pblock in
VDataStreamRecvr::SenderQueue (#44378)
378b2f21e22 is described below
commit 378b2f21e22b11e8b912b077f2032e63c1ca0c81
Author: Mryange <[email protected]>
AuthorDate: Tue Nov 26 15:41:38 2024 +0800
[opt](exec)lazy deserialize pblock in VDataStreamRecvr::SenderQueue
(#44378)
### What problem does this PR solve?
Previously, for a `pblock` (serialized block), the block would be
deserialized immediately
after receiving the RPC request and then placed into the `data_queue`.
This approach caused significant time consumption during RPC processing
due to the
deserialization process, impacting overall performance.
The new approach defers deserialization until `getBlock` is called. This
has the following advantages:
1. Reduces time spent during the RPC handling phase.
2. Memory allocation for deserialization happens within the execution
thread, improving cache locality
and reducing contention on memory resources.
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [x] No need to test or manual test. Explain why:
- [x] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/vec/runtime/vdata_stream_mgr.cpp | 11 +++--
be/src/vec/runtime/vdata_stream_recvr.cpp | 68 +++++++++++++------------------
be/src/vec/runtime/vdata_stream_recvr.h | 49 ++++++++++++++++++----
3 files changed, 76 insertions(+), 52 deletions(-)
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 382a6d0e6e3..7dad3d2c867 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -18,10 +18,12 @@
#include "vec/runtime/vdata_stream_mgr.h"
#include <gen_cpp/Types_types.h>
+#include <gen_cpp/data.pb.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <stddef.h>
+#include <memory>
#include <ostream>
#include <string>
#include <vector>
@@ -141,9 +143,12 @@ Status VDataStreamMgr::transmit_block(const
PTransmitDataParams* request,
bool eos = request->eos();
if (request->has_block()) {
- RETURN_IF_ERROR(recvr->add_block(
- request->block(), request->sender_id(), request->be_number(),
request->packet_seq(),
- eos ? nullptr : done, wait_for_worker,
cpu_time_stop_watch.elapsed_time()));
+ std::unique_ptr<PBlock> pblock_ptr {
+ const_cast<PTransmitDataParams*>(request)->release_block()};
+ RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr),
request->sender_id(),
+ request->be_number(),
request->packet_seq(),
+ eos ? nullptr : done, wait_for_worker,
+ cpu_time_stop_watch.elapsed_time()));
}
if (eos) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index b48b9f780b8..81e4e1cd5f0 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -69,7 +69,6 @@ VDataStreamRecvr::SenderQueue::~SenderQueue() {
}
Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
- std::lock_guard<std::mutex> l(_lock); // protect _block_queue
#ifndef NDEBUG
if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
@@ -79,25 +78,33 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block*
block, bool* eos) {
_debug_string_info());
}
#endif
- return _inner_get_batch_without_lock(block, eos);
-}
+ BlockItem block_item;
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ //check and get block_item from data_queue
+ if (_is_cancelled) {
+ RETURN_IF_ERROR(_cancel_status);
+ return Status::Cancelled("Cancelled");
+ }
-Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block*
block, bool* eos) {
- if (_is_cancelled) {
- RETURN_IF_ERROR(_cancel_status);
- return Status::Cancelled("Cancelled");
- }
+ if (_block_queue.empty()) {
+ DCHECK_EQ(_num_remaining_senders, 0);
+ *eos = true;
+ return Status::OK();
+ }
- if (_block_queue.empty()) {
- DCHECK_EQ(_num_remaining_senders, 0);
- *eos = true;
- return Status::OK();
+ DCHECK(!_block_queue.empty());
+ block_item = std::move(_block_queue.front());
+ _block_queue.pop_front();
}
-
- DCHECK(!_block_queue.empty());
- auto [next_block, block_byte_size] = std::move(_block_queue.front());
- _block_queue.pop_front();
+ BlockUPtr next_block;
+ RETURN_IF_ERROR(block_item.get_block(next_block));
+ size_t block_byte_size = block_item.block_byte_size();
+ COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer,
block_item.deserialize_time());
+ COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
+ COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
_recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size);
+ std::lock_guard<std::mutex> l(_lock);
sub_blocks_memory_usage(block_byte_size);
_record_debug_info();
if (_block_queue.empty() && _source_dependency) {
@@ -133,7 +140,7 @@ void
VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() {
}
}
-Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int
be_number,
+Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock>
pblock, int be_number,
int64_t packet_seq,
::google::protobuf::Closure**
done,
const int64_t wait_for_worker,
@@ -163,30 +170,12 @@ Status VDataStreamRecvr::SenderQueue::add_block(const
PBlock& pblock, int be_num
}
}
- BlockUPtr block = nullptr;
- int64_t deserialize_time = 0;
- {
- SCOPED_RAW_TIMER(&deserialize_time);
- block = Block::create_unique();
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(block->deserialize(pblock));
- }
-
- const auto rows = block->rows();
- if (rows == 0) {
- return Status::OK();
- }
- auto block_byte_size = block->allocated_bytes();
- VLOG_ROW << "added #rows=" << rows << " batch_size=" << block_byte_size <<
"\n";
-
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return Status::OK();
}
- COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
- COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
- COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
- COUNTER_UPDATE(_recvr->_rows_produced_counter, rows);
+ const auto block_byte_size = pblock->ByteSizeLong();
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
if (_recvr->_max_wait_worker_time->value() < wait_for_worker) {
_recvr->_max_wait_worker_time->set(wait_for_worker);
@@ -196,7 +185,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const
PBlock& pblock, int be_num
_recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
}
- _block_queue.emplace_back(std::move(block), block_byte_size);
+ _block_queue.emplace_back(std::move(pblock), block_byte_size);
COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
_record_debug_info();
try_set_dep_ready_without_lock();
@@ -370,7 +359,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr*
stream_mgr, pipeline::Exchang
_first_batch_wait_total_timer = ADD_TIMER(_profile,
"FirstBatchArrivalWaitTime");
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
_decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
- _rows_produced_counter = ADD_COUNTER(_profile, "RowsProduced",
TUnit::UNIT);
_blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced",
TUnit::UNIT);
_max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime",
TUnit::UNIT);
_max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime",
TUnit::UNIT);
@@ -401,13 +389,13 @@ Status VDataStreamRecvr::create_merger(const
VExprContextSPtrs& ordering_expr,
return Status::OK();
}
-Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int
be_number,
+Status VDataStreamRecvr::add_block(std::unique_ptr<PBlock> pblock, int
sender_id, int be_number,
int64_t packet_seq,
::google::protobuf::Closure** done,
const int64_t wait_for_worker,
const uint64_t time_to_find_recvr) {
SCOPED_ATTACH_TASK(_query_thread_context);
int use_sender_id = _is_merging ? sender_id : 0;
- return _sender_queues[use_sender_id]->add_block(pblock, be_number,
packet_seq, done,
+ return _sender_queues[use_sender_id]->add_block(std::move(pblock),
be_number, packet_seq, done,
wait_for_worker,
time_to_find_recvr);
}
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 08fb004f3b1..1639366c8b8 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -18,6 +18,7 @@
#pragma once
#include <gen_cpp/Types_types.h>
+#include <gen_cpp/data.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
@@ -84,9 +85,9 @@ public:
std::vector<SenderQueue*> sender_queues() const { return _sender_queues; }
- Status add_block(const PBlock& pblock, int sender_id, int be_number,
int64_t packet_seq,
- ::google::protobuf::Closure** done, const int64_t
wait_for_worker,
- const uint64_t time_to_find_recvr);
+ Status add_block(std::unique_ptr<PBlock> pblock, int sender_id, int
be_number,
+ int64_t packet_seq, ::google::protobuf::Closure** done,
+ const int64_t wait_for_worker, const uint64_t
time_to_find_recvr);
void add_block(Block* block, int sender_id, bool use_move);
@@ -157,8 +158,6 @@ private:
RuntimeProfile::Counter* _decompress_timer = nullptr;
RuntimeProfile::Counter* _decompress_bytes = nullptr;
- // Number of rows received
- RuntimeProfile::Counter* _rows_produced_counter = nullptr;
// Number of blocks received
RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
RuntimeProfile::Counter* _max_wait_worker_time = nullptr;
@@ -181,7 +180,7 @@ public:
Status get_batch(Block* next_block, bool* eos);
- Status add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
+ Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t
packet_seq,
::google::protobuf::Closure** done, const int64_t
wait_for_worker,
const uint64_t time_to_find_recvr);
@@ -205,8 +204,6 @@ public:
protected:
friend class pipeline::ExchangeLocalState;
- Status _inner_get_batch_without_lock(Block* block, bool* eos);
-
void try_set_dep_ready_without_lock();
// To record information about several variables in the event of a DCHECK
failure.
@@ -260,7 +257,41 @@ protected:
Status _cancel_status;
int _num_remaining_senders;
std::unique_ptr<MemTracker> _queue_mem_tracker;
- std::list<std::pair<BlockUPtr, size_t>> _block_queue;
+
+ // `BlockItem` is used in `_block_queue` to handle both local and remote
exchange blocks.
+ // For local exchange blocks, `BlockUPtr` is used directly without any
modification.
+ // For remote exchange blocks, the `pblock` is stored in `BlockItem`.
+ // When `getBlock` is called, the `pblock` is deserialized into a usable
block.
+ struct BlockItem {
+ Status get_block(BlockUPtr& block) {
+ if (!_block) {
+ DCHECK(_pblock);
+ SCOPED_RAW_TIMER(&_deserialize_time);
+ _block = Block::create_unique();
+
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_block->deserialize(*_pblock));
+ }
+ block.swap(_block);
+ _block.reset();
+ return Status::OK();
+ }
+
+ size_t block_byte_size() const { return _block_byte_size; }
+ int64_t deserialize_time() const { return _deserialize_time; }
+ BlockItem() = default;
+ BlockItem(BlockUPtr&& block, size_t block_byte_size)
+ : _block(std::move(block)), _block_byte_size(block_byte_size)
{}
+
+ BlockItem(std::unique_ptr<PBlock>&& pblock, size_t block_byte_size)
+ : _block(nullptr), _pblock(std::move(pblock)),
_block_byte_size(block_byte_size) {}
+
+ private:
+ BlockUPtr _block;
+ std::unique_ptr<PBlock> _pblock;
+ size_t _block_byte_size = 0;
+ int64_t _deserialize_time = 0;
+ };
+
+ std::list<BlockItem> _block_queue;
// sender_id
std::unordered_set<int> _sender_eos_set;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]