This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 012c2d746c6 [fix](remote scan) Fix remote scan paralism (#42633) 012c2d746c6 is described below commit 012c2d746c6a7b8c8a75c88a48acc3c5b4b28dd9 Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Thu Oct 31 11:39:32 2024 +0800 [fix](remote scan) Fix remote scan paralism (#42633) --- be/src/pipeline/exec/scan_operator.cpp | 2 +- be/src/vec/exec/scan/pip_scanner_context.h | 21 +++++++++++++++++---- be/src/vec/exec/scan/scanner_context.cpp | 16 +++++++++++----- be/src/vec/exec/scan/scanner_context.h | 6 ++++-- be/src/vec/exec/scan/vscan_node.cpp | 2 +- 5 files changed, 34 insertions(+), 13 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 7e91b501ab6..b631d127032 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1040,7 +1040,7 @@ Status ScanLocalState<Derived>::_start_scanners( auto& p = _parent->cast<typename Derived::Parent>(); _scanner_ctx = PipXScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - _scan_dependency, p.ignore_data_distribution()); + _scan_dependency, p.ignore_data_distribution(), p.is_file_scan_operator()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index b7684ac5fe3..b0bbaa76f26 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -32,8 +32,20 @@ public: const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_, bool ignore_data_distribution) - : vectorized::ScannerContext(state, parent, output_tuple_desc, output_row_descriptor, - scanners, limit_, ignore_data_distribution) {} + : vectorized::ScannerContext( + state, parent, output_tuple_desc, output_row_descriptor, scanners, limit_, + ignore_data_distribution, + /*non-pipeine & old pipeine does not process file scan operator seperatyly*/ + /*they use state->query_parallel_instance_num() as num_parallel_instances, see: + _max_thread_num = _state->num_scanner_threads() > 0 + ? _state->num_scanner_threads() + : config::doris_scanner_thread_pool_thread_num / + (_local_state ? num_parallel_instances + : state->query_parallel_instance_num()); + */ + // so we set is_file_scan_operator to true + // so that _max_thread_num will be same like before for engine except for pipelineX + true) {} }; class PipXScannerContext final : public vectorized::ScannerContext { @@ -45,9 +57,10 @@ public: const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_, std::shared_ptr<pipeline::Dependency> dependency, - bool ignore_data_distribution) + bool ignore_data_distribution, bool is_file_scan_operator) : vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, - limit_, ignore_data_distribution, local_state) { + limit_, ignore_data_distribution, is_file_scan_operator, + local_state) { _dependency = dependency; } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 5c71f7f7cb7..ee34c5fb774 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -44,6 +44,7 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_, bool ignore_data_distribution, + bool is_file_scan_operator, pipeline::ScanLocalStateBase* local_state) : HasTaskExecutionCtx(state), _state(state), @@ -56,7 +57,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu limit(limit_), _scanner_scheduler_global(state->exec_env()->scanner_scheduler()), _all_scanners(scanners.begin(), scanners.end()), - _ignore_data_distribution(ignore_data_distribution) { + _ignore_data_distribution(ignore_data_distribution), + _is_file_scan_operator(is_file_scan_operator) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); @@ -77,9 +79,10 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_, bool ignore_data_distribution, + bool is_file_scan_operator, pipeline::ScanLocalStateBase* local_state) : ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, limit_, - ignore_data_distribution, local_state) { + ignore_data_distribution, is_file_scan_operator, local_state) { _parent = parent; // No need to increase scanner_ctx_cnt here. Since other constructor has already done it. @@ -160,7 +163,8 @@ Status ScannerContext::init() { } // _scannner_scheduler will be used to submit scan task. - if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size) { + if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size || + _is_file_scan_operator) { submit_many_scan_tasks_for_potential_performance_issue = false; } @@ -183,8 +187,10 @@ Status ScannerContext::init() { if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) { _max_thread_num = config::doris_scanner_thread_pool_thread_num / 1; } else { - _max_thread_num = - 4 * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances); + const int scale_arg = _is_file_scan_operator ? 1 : 4; + _max_thread_num = scale_arg * (config::doris_scanner_thread_pool_thread_num / + num_parallel_instances); + // In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed // in parallel. We need to make sure the _max_thread_num is smaller than previous value. _max_thread_num = diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 449a5b1b470..681ee68bf38 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -107,7 +107,7 @@ public: ScannerContext(RuntimeState* state, VScanNode* parent, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<ScannerDelegate>>& scanners, int64_t limit_, - bool ignore_data_distribution, + bool ignore_data_distribution, bool is_file_scan_operator, pipeline::ScanLocalStateBase* local_state = nullptr); ~ScannerContext() override { @@ -183,7 +183,8 @@ protected: ScannerContext(RuntimeState* state_, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<ScannerDelegate>>& scanners_, int64_t limit_, - bool ignore_data_distribution, pipeline::ScanLocalStateBase* local_state); + bool ignore_data_distribution, bool is_file_scan_operator, + pipeline::ScanLocalStateBase* local_state); /// Four criteria to determine whether to increase the parallelism of the scanners /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up @@ -234,6 +235,7 @@ protected: RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; QueryThreadContext _query_thread_context; bool _ignore_data_distribution = false; + bool _is_file_scan_operator; // for scaling up the running scanners size_t _estimated_block_size = 0; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index a7842188feb..8348641e5ce 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -321,7 +321,7 @@ void VScanNode::_start_scanners(const std::list<std::shared_ptr<ScannerDelegate> } else { _scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc, _output_row_descriptor.get(), scanners, - limit(), false); + limit(), false, true); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org