This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 9a19581a2c5 [improvement](scanner_schedule) reduce memory consumption
of scanner #24199 (#25547)
9a19581a2c5 is described below
commit 9a19581a2c55c6e78e7e2812a45e1bc3842ffec7
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Nov 3 19:10:30 2023 +0800
[improvement](scanner_schedule) reduce memory consumption of scanner #24199
(#25547)
---
be/src/exec/exec_node.cpp | 3 +-
be/src/pipeline/exec/scan_operator.cpp | 2 +-
be/src/runtime/plan_fragment_executor.cpp | 1 +
be/src/vec/exec/scan/pip_scanner_context.h | 7 +--
be/src/vec/exec/scan/scanner_context.cpp | 68 +++++++++++++++++-------------
be/src/vec/exec/scan/scanner_context.h | 28 +++++++-----
be/src/vec/exec/scan/scanner_scheduler.cpp | 12 ++----
7 files changed, 66 insertions(+), 55 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 0dc8df911b2..c9e327ad640 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -41,6 +41,7 @@
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
+#include "util/stack_util.h"
#include "util/uid_util.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
@@ -205,7 +206,7 @@ Status ExecNode::close(RuntimeState* state) {
<< " already closed";
return Status::OK();
}
- LOG(INFO) << "fragment_instance_id=" <<
print_id(state->fragment_instance_id()) << " closed";
+ LOG(INFO) << "fragment_instance_id=" <<
print_id(state->fragment_instance_id()) << " closed. ";
_is_closed = true;
Status result;
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index f34461a9fd2..1f15b1d61f8 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -44,7 +44,7 @@ bool ScanOperator::can_read() {
return true;
} else {
if (_node->_scanner_ctx->get_num_running_scanners() == 0 &&
- _node->_scanner_ctx->has_enough_space_in_blocks_queue()) {
+ _node->_scanner_ctx->should_be_scheduled()) {
_node->_scanner_ctx->reschedule_scanner_ctx();
}
return _node->ready_to_read(); // there are some blocks to process
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index df5f4b7d3e4..ff2c0b8688a 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -53,6 +53,7 @@
#include "util/container_util.hpp"
#include "util/defer_op.h"
#include "util/pretty_printer.h"
+#include "util/stack_util.h"
#include "util/telemetry/telemetry.h"
#include "util/threadpool.h"
#include "util/time.h"
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index b98c628368e..f52bd3bf3c7 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -166,10 +166,6 @@ public:
_free_blocks_memory_usage->add(free_blocks_memory_usage);
}
- bool has_enough_space_in_blocks_queue() const override {
- return _current_used_bytes < _max_bytes_in_queue / 2 *
_num_parallel_instances;
- }
-
void _dispose_coloate_blocks_not_in_queue() override {
if (_need_colocate_distribute) {
for (int i = 0; i < _num_parallel_instances; ++i) {
@@ -221,8 +217,7 @@ private:
std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
}
- bool get_block_not_empty = true;
- _colocate_blocks[loc] = get_free_block(&get_block_not_empty,
get_block_not_empty);
+ _colocate_blocks[loc] = get_free_block();
_colocate_mutable_blocks[loc]->set_muatable_columns(
_colocate_blocks[loc]->mutate_columns());
}
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 478d9fb4cb7..8deb2153478 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -52,7 +52,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_,
doris::vectorized::V
_process_status(Status::OK()),
_batch_size(state_->batch_size()),
limit(limit_),
- _max_bytes_in_queue(max_bytes_in_blocks_queue_),
+ _max_bytes_in_queue(max_bytes_in_blocks_queue_ *
num_parallel_instances),
_scanner_scheduler(state_->exec_env()->scanner_scheduler()),
_scanners(scanners_),
_num_parallel_instances(num_parallel_instances) {
@@ -63,26 +63,21 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_,
doris::vectorized::V
if (limit < 0) {
limit = -1;
}
-}
-// After init function call, should not access _parent
-Status ScannerContext::init() {
- // 1. Calculate max concurrency
- // TODO: now the max thread num <=
config::doris_scanner_thread_pool_thread_num / 4
- // should find a more reasonable value.
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
- if (_parent->_shared_scan_opt) {
- DCHECK(_num_parallel_instances > 0);
- _max_thread_num *= _num_parallel_instances;
- }
+ _max_thread_num *= num_parallel_instances;
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
DCHECK(_max_thread_num > 0);
_max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
+ // 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if (_parent->should_run_serial()) {
_max_thread_num = 1;
}
+}
+// After init function call, should not access _parent
+Status ScannerContext::init() {
_scanner_profile = _parent->_scanner_profile;
_scanner_sched_counter = _parent->_scanner_sched_counter;
_scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter;
@@ -104,6 +99,9 @@ Status ScannerContext::init() {
limit == -1 ? _batch_size :
std::min(static_cast<int64_t>(_batch_size), limit);
_block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) /
real_block_size;
_free_blocks_capacity = _max_thread_num * _block_per_scanner;
+ auto block = get_free_block();
+ _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
+ return_free_block(std::move(block));
#ifndef BE_TEST
// 3. get thread token
@@ -123,27 +121,33 @@ Status ScannerContext::init() {
return Status::OK();
}
-vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
- bool get_block_not_empty)
{
+vectorized::BlockUPtr ScannerContext::get_free_block() {
vectorized::BlockUPtr block;
if (_free_blocks.try_dequeue(block)) {
- if (!get_block_not_empty || block->mem_reuse()) {
- _free_blocks_capacity--;
- _free_blocks_memory_usage->add(-block->allocated_bytes());
- return block;
- }
+ DCHECK(block->mem_reuse());
+ _free_blocks_memory_usage->add(-block->allocated_bytes());
+ _serving_blocks_num++;
+ return block;
}
+ block = vectorized::Block::create_unique(_output_tuple_desc->slots(),
_batch_size,
+ true /*ignore invalid slots*/);
COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
- return vectorized::Block::create_unique(_output_tuple_desc->slots(),
_batch_size,
- true /*ignore invalid slots*/);
+
+ _serving_blocks_num++;
+ return block;
}
void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block>
block) {
- block->clear_column_data();
- _free_blocks_memory_usage->add(block->allocated_bytes());
- _free_blocks.enqueue(std::move(block));
- ++_free_blocks_capacity;
+ _serving_blocks_num--;
+ if (block->mem_reuse()) {
+ // Only put blocks with schema to free blocks, because colocate blocks
+ // need schema.
+ _estimated_block_bytes = std::max(block->allocated_bytes(),
(size_t)16);
+ block->clear_column_data();
+ _free_blocks_memory_usage->add(block->allocated_bytes());
+ _free_blocks.enqueue(std::move(block));
+ }
}
void
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
blocks) {
@@ -176,7 +180,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
// (if the scheduler continues to schedule, it will cause a lot of busy
running).
// At this point, consumers are required to trigger new scheduling to
ensure that
// data can be continuously fetched.
- if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) {
+ if (should_be_scheduled() && _num_running_scanners == 0) {
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
@@ -184,6 +188,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
set_status_on_error(state, false);
}
}
+
// Wait for block from queue
if (wait) {
SCOPED_TIMER(_scanner_wait_batch_timer);
@@ -207,6 +212,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
+
_queued_blocks_memory_usage->add(-block_bytes);
return Status::OK();
} else {
@@ -353,7 +359,13 @@ void
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
_scanners.push_front(scanner);
}
std::lock_guard l(_transfer_lock);
- if (has_enough_space_in_blocks_queue()) {
+
+ // In pipeline engine, doris will close scanners when `no_schedule`.
+ // We have to decrease _num_running_scanners before schedule, otherwise
+ // schedule does not woring due to _num_running_scanners.
+ _num_running_scanners--;
+
+ if (should_be_scheduled()) {
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
@@ -373,8 +385,6 @@ void
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
_is_finished = true;
_blocks_queue_added_cv.notify_one();
}
- // In pipeline engine, doris will close scanners when `no_schedule`.
- _num_running_scanners--;
_ctx_finish_cv.notify_one();
}
@@ -384,7 +394,7 @@ void
ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
{
// If there are enough space in blocks queue,
// the scanner number depends on the _free_blocks numbers
- thread_slot_num = cal_thread_slot_num_by_free_block_num();
+ thread_slot_num = get_available_thread_slot_num();
}
// 2. get #thread_slot_num scanners from ctx->scanners
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 3aad0d6263f..a345bfc03dd 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -62,12 +62,12 @@ public:
ScannerContext(RuntimeState* state_, VScanNode* parent,
const TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
- int64_t max_bytes_in_blocks_queue_, const int
num_parallel_instances = 0);
+ int64_t max_bytes_in_blocks_queue_, const int
num_parallel_instances = 1);
virtual ~ScannerContext() = default;
virtual Status init();
- vectorized::BlockUPtr get_free_block(bool* has_free_block, bool
get_not_empty_block = false);
+ vectorized::BlockUPtr get_free_block();
void return_free_block(std::unique_ptr<vectorized::Block> block);
// Append blocks from scanners to the blocks queue.
@@ -136,20 +136,25 @@ public:
virtual bool empty_in_queue(int id);
// todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when
executing shared scan
- virtual inline bool has_enough_space_in_blocks_queue() const {
- return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
+ inline bool should_be_scheduled() const {
+ return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
+ (_serving_blocks_num < allowed_blocks_num());
}
- int cal_thread_slot_num_by_free_block_num() {
+ int get_available_thread_slot_num() {
int thread_slot_num = 0;
- thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) /
_block_per_scanner;
+ thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) /
_block_per_scanner;
thread_slot_num = std::min(thread_slot_num, _max_thread_num -
_num_running_scanners);
- if (thread_slot_num <= 0) {
- thread_slot_num = 1;
- }
return thread_slot_num;
}
+ int32_t allowed_blocks_num() const {
+ int32_t blocks_num = std::min(_free_blocks_capacity,
+ int32_t((_max_bytes_in_queue +
_estimated_block_bytes - 1) /
+ _estimated_block_bytes));
+ return blocks_num;
+ }
+
void reschedule_scanner_ctx();
// the unique id of this context
@@ -203,10 +208,12 @@ protected:
// Lazy-allocated blocks for all scanners to share, for memory reuse.
moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks;
+ std::atomic<int32_t> _serving_blocks_num = 0;
// The current number of free blocks available to the scanners.
// Used to limit the memory usage of the scanner.
// NOTE: this is NOT the size of `_free_blocks`.
- std::atomic_int32_t _free_blocks_capacity = 0;
+ int32_t _free_blocks_capacity = 0;
+ int64_t _estimated_block_bytes = 0;
int _batch_size;
// The limit from SQL's limit clause
@@ -231,6 +238,7 @@ protected:
int64_t _cur_bytes_in_queue = 0;
// The max limit bytes of blocks in blocks queue
const int64_t _max_bytes_in_queue;
+ std::atomic<int64_t> _bytes_allocated = 0;
doris::vectorized::ScannerScheduler* _scanner_scheduler;
// List "scanners" saves all "unfinished" scanners.
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 3481128a1d2..2529ce67e5e 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -321,7 +321,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
int64_t raw_bytes_read = 0;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
- bool has_free_block = true;
int num_rows_in_block = 0;
// Only set to true when ctx->done() return true.
@@ -331,9 +330,8 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
bool should_stop = false;
// Has to wait at least one full block, or it will cause a lot of schedule
task in priority
// queue, it will affect query latency and query concurrency for example
ssb 3.3.
- while (!eos && raw_bytes_read < raw_bytes_threshold &&
- ((raw_rows_read < raw_rows_threshold && has_free_block) ||
- num_rows_in_block < state->batch_size())) {
+ while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read <
raw_rows_threshold &&
+ num_rows_in_block < state->batch_size()) {
if (UNLIKELY(ctx->done())) {
// No need to set status on error here.
// Because done() maybe caused by "should_stop"
@@ -341,7 +339,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
break;
}
- BlockUPtr block = ctx->get_free_block(&has_free_block);
+ BlockUPtr block = ctx->get_free_block();
status = scanner->get_block(state, block.get(), &eos);
VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " <<
eos;
// The VFileScanner for external table may try to open not exist files,
@@ -357,12 +355,11 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
if (status.is<ErrorCode::NOT_FOUND>()) {
// The only case in this "if" branch is external table file delete
and fe cache has not been updated yet.
// Set status to OK.
- LOG(INFO) << "scan range not found: " <<
scanner->get_current_scan_range_name();
status = Status::OK();
eos = true;
}
- raw_bytes_read += block->bytes();
+ raw_bytes_read += block->allocated_bytes();
num_rows_in_block += block->rows();
if (UNLIKELY(block->rows() == 0)) {
ctx->return_free_block(std::move(block));
@@ -397,7 +394,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
if (eos || should_stop) {
scanner->mark_to_need_to_close();
}
-
ctx->push_back_scanner_and_reschedule(scanner);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]