This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6e3458b79a1 [Try_Fix](scan) try fix the scanner schedule logic to
prevent excessive memory usage and timeout (#30515)
6e3458b79a1 is described below
commit 6e3458b79a1af6e0b18360fb12709f5d66b0e8b9
Author: HappenLee <[email protected]>
AuthorDate: Mon Jan 29 23:41:55 2024 +0800
[Try_Fix](scan) try fix the scanner schedule logic to prevent excessive
memory usage and timeout (#30515)
---
be/src/vec/exec/scan/pip_scanner_context.h | 14 ++++++++++++--
be/src/vec/exec/scan/scanner_context.cpp | 6 +++---
be/src/vec/exec/scan/scanner_context.h | 5 +++--
3 files changed, 18 insertions(+), 7 deletions(-)
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index c2490a5e0d0..484fe5b4003 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -40,7 +40,7 @@ public:
_need_colocate_distribute(!_col_distribute_ids.empty()) {}
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
- int id, bool wait = false) override {
+ int id) override {
{
std::unique_lock l(_transfer_lock);
if (state->is_cancelled()) {
@@ -97,6 +97,11 @@ public:
(*block)->set_columns(std::move(m.mutable_columns()));
}
+ // after return free blocks, should try to reschedule the scanner
+ if (should_be_scheduled()) {
+ this->reschedule_scanner_ctx();
+ }
+
return Status::OK();
}
@@ -284,7 +289,7 @@ public:
limit_, max_bytes_in_blocks_queue, 1,
local_state,
dependency) {}
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
- int id, bool wait = false) override {
+ int id) override {
if (_blocks_queue_buffered.empty()) {
std::unique_lock l(_transfer_lock);
if (state->is_cancelled()) {
@@ -349,6 +354,11 @@ public:
}
return_free_blocks();
+ // after return free blocks, should try to reschedule the scanner
+ if (should_be_scheduled()) {
+ this->reschedule_scanner_ctx();
+ }
+
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 7691f159509..033709f950e 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -273,7 +273,7 @@ bool ScannerContext::empty_in_queue(int id) {
}
Status ScannerContext::get_block_from_queue(RuntimeState* state,
vectorized::BlockUPtr* block,
- bool* eos, int id, bool wait) {
+ bool* eos, int id) {
std::vector<vectorized::BlockUPtr> merge_blocks;
{
std::unique_lock l(_transfer_lock);
@@ -295,9 +295,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
}
// Wait for block from queue
- if (wait) {
- // scanner batch wait time
+ {
SCOPED_TIMER(_scanner_wait_batch_timer);
+ // scanner batch wait time
while (!(!_blocks_queue.empty() || done() || !status().ok() ||
state->is_cancelled())) {
if (!is_scheduled && _num_running_scanners == 0 &&
should_be_scheduled()) {
LOG(INFO) << debug_string();
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 28aec83d6a2..59e4c45a52a 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -89,7 +89,7 @@ public:
// Set eos to true if there is no more data to read.
// And if eos is true, the block returned must be nullptr.
virtual Status get_block_from_queue(RuntimeState* state,
vectorized::BlockUPtr* block,
- bool* eos, int id, bool wait = true);
+ bool* eos, int id);
[[nodiscard]] Status validate_block_schema(Block* block);
@@ -134,7 +134,8 @@ public:
// todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when
executing shared scan
inline bool should_be_scheduled() const {
- return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
+ return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
+ (_serving_blocks_num < allowed_blocks_num());
}
int get_available_thread_slot_num() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]