This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ae50d7a614a [Scanner](revert) revert the scanner change by
apache#35604 (#31508)
ae50d7a614a is described below
commit ae50d7a614a8c62b47aea59340054676be62cdf2
Author: HappenLee <[email protected]>
AuthorDate: Mon Jun 3 00:07:07 2024 +0800
[Scanner](revert) revert the scanner change by apache#35604 (#31508)
---
be/src/vec/exec/scan/scanner_context.cpp | 29 +++++++++++++++--------------
1 file changed, 15 insertions(+), 14 deletions(-)
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index fd570c99a68..813643978d0 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -296,12 +296,12 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
// `_free_blocks` serve all running scanners, maybe it's
too large for the remaining scanners
int free_blocks_for_each = _free_blocks.size_approx() /
_num_running_scanners;
_num_running_scanners--;
- std::vector<vectorized::BlockUPtr>
blocks(free_blocks_for_each);
- free_blocks_for_each =
- _free_blocks.try_dequeue_bulk(blocks.data(),
free_blocks_for_each);
for (int i = 0; i < free_blocks_for_each; ++i) {
- _free_blocks_memory_usage -=
blocks[i]->allocated_bytes();
-
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+ vectorized::BlockUPtr removed_block;
+ if (_free_blocks.try_dequeue(removed_block)) {
+ _free_blocks_memory_usage -=
block->allocated_bytes();
+
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+ }
}
}
} else {
@@ -350,16 +350,17 @@ void ScannerContext::_try_to_scale_up() {
(_max_bytes_in_queue - _free_blocks_memory_usage) /
_estimated_block_size;
num_add = std::min(num_add, most_add);
}
- std::vector<std::weak_ptr<ScannerDelegate>> scale_up_scanners(num_add);
- // get enough memory to launch one more scanner.
- if (auto real_size =
_scanners.try_dequeue_bulk(scale_up_scanners.data(), num_add);
- real_size) {
- for (int i = 0; i < real_size; ++i) {
-
submit_scan_task(std::make_shared<ScanTask>(scale_up_scanners[i]));
+ for (int i = 0; i < num_add; ++i) {
+ // get enough memory to launch one more scanner.
+ std::weak_ptr<ScannerDelegate> scale_up_scanner;
+ if (_scanners.try_dequeue(scale_up_scanner)) {
+ submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner));
+ _num_running_scanners++;
+ _scale_up_scanners_counter->update(1);
+ is_scale_up = true;
+ } else {
+ break;
}
- _num_running_scanners += real_size;
- _scale_up_scanners_counter->update(real_size);
- is_scale_up = true;
}
if (is_scale_up) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]