This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9a1402da070 [opt](scanner) use buffered queue to avoid acquiring locks
frequently (#29938)
9a1402da070 is described below
commit 9a1402da070bd8833715b0099af06b929803532b
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Jan 15 10:28:20 2024 +0800
[opt](scanner) use buffered queue to avoid acquiring locks frequently
(#29938)
---
be/src/vec/exec/scan/pip_scanner_context.h | 85 +++++++++++++++++++++---------
1 file changed, 60 insertions(+), 25 deletions(-)
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index 8f79e3021a5..77237bb71af 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -289,42 +289,51 @@ public:
dependency) {}
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
int id, bool wait = false) override {
- std::unique_lock l(_transfer_lock);
- if (state->is_cancelled()) {
- set_status_on_error(Status::Cancelled("cancelled"), false);
- }
+ if (_blocks_queue_buffered.empty()) {
+ std::unique_lock l(_transfer_lock);
+ if (state->is_cancelled()) {
+ set_status_on_error(Status::Cancelled("cancelled"), false);
+ }
- if (!status().ok()) {
- return _process_status;
- }
+ if (!status().ok()) {
+ return _process_status;
+ }
- std::vector<vectorized::BlockUPtr> merge_blocks;
- if (_blocks_queue.empty()) {
- *eos = done();
- return Status::OK();
- }
- if (_process_status.is<ErrorCode::CANCELLED>()) {
- *eos = true;
- return Status::OK();
+ if (_blocks_queue.empty()) {
+ *eos = done();
+ return Status::OK();
+ }
+ if (_process_status.is<ErrorCode::CANCELLED>()) {
+ *eos = true;
+ return Status::OK();
+ }
+
+ _blocks_queue_buffered = std::move(_blocks_queue);
}
- *block = std::move(_blocks_queue.front());
- _blocks_queue.pop_front();
+ *block = std::move(_blocks_queue_buffered.front());
+ _blocks_queue_buffered.pop_front();
+ std::vector<vectorized::BlockUPtr> merge_blocks;
auto rows = (*block)->rows();
- while (!_blocks_queue.empty()) {
- const auto add_rows = (*_blocks_queue.front()).rows();
+ while (!_blocks_queue_buffered.empty()) {
+ const auto add_rows = (*_blocks_queue_buffered.front()).rows();
if (rows + add_rows < state->batch_size()) {
rows += add_rows;
- merge_blocks.emplace_back(std::move(_blocks_queue.front()));
- _blocks_queue.pop_front();
+
merge_blocks.emplace_back(std::move(_blocks_queue_buffered.front()));
+ _blocks_queue_buffered.pop_front();
} else {
break;
}
}
- if (_blocks_queue.empty()) {
- this->reschedule_scanner_ctx();
- _dependency->block();
+ if (_blocks_queue_buffered.empty()) {
+ std::unique_lock l(_transfer_lock);
+ if (_blocks_queue.empty()) {
+ this->reschedule_scanner_ctx();
+ _dependency->block();
+ } else {
+ _blocks_queue_buffered = std::move(_blocks_queue);
+ }
}
_cur_bytes_in_queue -= (*block)->allocated_bytes();
@@ -333,10 +342,13 @@ public:
for (auto& merge_block : merge_blocks) {
_cur_bytes_in_queue -= merge_block->allocated_bytes();
static_cast<void>(m.merge(*merge_block));
- return_free_block(std::move(merge_block));
+ if (merge_block->mem_reuse()) {
+ _free_blocks_buffered.emplace_back(std::move(merge_block));
+ }
}
(*block)->set_columns(std::move(m.mutable_columns()));
}
+ return_free_blocks();
return Status::OK();
}
@@ -353,6 +365,29 @@ public:
set_status_on_error(state, false);
}
}
+
+private:
+ void return_free_blocks() {
+ if (_free_blocks_buffered.empty()) {
+ return;
+ }
+
+ size_t total_bytes = 0;
+ for (auto& block : _free_blocks_buffered) {
+ const auto bytes = block->allocated_bytes();
+ block->clear_column_data();
+ _estimated_block_bytes = std::max(bytes, (size_t)16);
+ total_bytes += bytes;
+ }
+ _free_blocks_memory_usage->add(total_bytes);
+ const auto count = _free_blocks_buffered.size();
+
_free_blocks.enqueue_bulk(std::make_move_iterator(_free_blocks_buffered.begin()),
count);
+ _free_blocks_buffered.clear();
+ _serving_blocks_num -= count;
+ }
+
+ std::vector<vectorized::BlockUPtr> _free_blocks_buffered;
+ std::list<vectorized::BlockUPtr> _blocks_queue_buffered;
};
} // namespace doris::pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]