This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 012c2d746c6 [fix](remote scan) Fix remote scan paralism (#42633)
012c2d746c6 is described below

commit 012c2d746c6a7b8c8a75c88a48acc3c5b4b28dd9
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Thu Oct 31 11:39:32 2024 +0800

    [fix](remote scan) Fix remote scan paralism (#42633)
---
 be/src/pipeline/exec/scan_operator.cpp     |  2 +-
 be/src/vec/exec/scan/pip_scanner_context.h | 21 +++++++++++++++++----
 be/src/vec/exec/scan/scanner_context.cpp   | 16 +++++++++++-----
 be/src/vec/exec/scan/scanner_context.h     |  6 ++++--
 be/src/vec/exec/scan/vscan_node.cpp        |  2 +-
 5 files changed, 34 insertions(+), 13 deletions(-)

diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 7e91b501ab6..b631d127032 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1040,7 +1040,7 @@ Status ScanLocalState<Derived>::_start_scanners(
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx = PipXScannerContext::create_shared(
             state(), this, p._output_tuple_desc, p.output_row_descriptor(), 
scanners, p.limit(),
-            _scan_dependency, p.ignore_data_distribution());
+            _scan_dependency, p.ignore_data_distribution(), 
p.is_file_scan_operator());
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index b7684ac5fe3..b0bbaa76f26 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -32,8 +32,20 @@ public:
                       const RowDescriptor* output_row_descriptor,
                       const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
                       int64_t limit_, bool ignore_data_distribution)
-            : vectorized::ScannerContext(state, parent, output_tuple_desc, 
output_row_descriptor,
-                                         scanners, limit_, 
ignore_data_distribution) {}
+            : vectorized::ScannerContext(
+                      state, parent, output_tuple_desc, output_row_descriptor, 
scanners, limit_,
+                      ignore_data_distribution,
+                      /*non-pipeine & old pipeine does not process file scan 
operator seperatyly*/
+                      /*they use state->query_parallel_instance_num() as 
num_parallel_instances, see:
+                        _max_thread_num = _state->num_scanner_threads() > 0
+                              ? _state->num_scanner_threads()
+                              : config::doris_scanner_thread_pool_thread_num /
+                                        (_local_state ? num_parallel_instances
+                                                      : 
state->query_parallel_instance_num());
+                                            */
+                      // so we set is_file_scan_operator to true
+                      // so that _max_thread_num will be same like before for 
engine except for pipelineX
+                      true) {}
 };
 
 class PipXScannerContext final : public vectorized::ScannerContext {
@@ -45,9 +57,10 @@ public:
                        const RowDescriptor* output_row_descriptor,
                        const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
                        int64_t limit_, std::shared_ptr<pipeline::Dependency> 
dependency,
-                       bool ignore_data_distribution)
+                       bool ignore_data_distribution, bool 
is_file_scan_operator)
             : vectorized::ScannerContext(state, output_tuple_desc, 
output_row_descriptor, scanners,
-                                         limit_, ignore_data_distribution, 
local_state) {
+                                         limit_, ignore_data_distribution, 
is_file_scan_operator,
+                                         local_state) {
         _dependency = dependency;
     }
 
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 5c71f7f7cb7..ee34c5fb774 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -44,6 +44,7 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
                                const RowDescriptor* output_row_descriptor,
                                const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
                                int64_t limit_, bool ignore_data_distribution,
+                               bool is_file_scan_operator,
                                pipeline::ScanLocalStateBase* local_state)
         : HasTaskExecutionCtx(state),
           _state(state),
@@ -56,7 +57,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
           limit(limit_),
           _scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
           _all_scanners(scanners.begin(), scanners.end()),
-          _ignore_data_distribution(ignore_data_distribution) {
+          _ignore_data_distribution(ignore_data_distribution),
+          _is_file_scan_operator(is_file_scan_operator) {
     DCHECK(_output_row_descriptor == nullptr ||
            _output_row_descriptor->tuple_descriptors().size() == 1);
     _query_id = _state->get_query_ctx()->query_id();
@@ -77,9 +79,10 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, 
doris::vectorized::VS
                                const RowDescriptor* output_row_descriptor,
                                const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
                                int64_t limit_, bool ignore_data_distribution,
+                               bool is_file_scan_operator,
                                pipeline::ScanLocalStateBase* local_state)
         : ScannerContext(state, output_tuple_desc, output_row_descriptor, 
scanners, limit_,
-                         ignore_data_distribution, local_state) {
+                         ignore_data_distribution, is_file_scan_operator, 
local_state) {
     _parent = parent;
 
     // No need to increase scanner_ctx_cnt here. Since other constructor has 
already done it.
@@ -160,7 +163,8 @@ Status ScannerContext::init() {
     }
 
     // _scannner_scheduler will be used to submit scan task.
-    if (_scanner_scheduler->get_queue_size() * 2 > 
config::doris_scanner_thread_pool_queue_size) {
+    if (_scanner_scheduler->get_queue_size() * 2 > 
config::doris_scanner_thread_pool_queue_size ||
+        _is_file_scan_operator) {
         submit_many_scan_tasks_for_potential_performance_issue = false;
     }
 
@@ -183,8 +187,10 @@ Status ScannerContext::init() {
         if (submit_many_scan_tasks_for_potential_performance_issue || 
_ignore_data_distribution) {
             _max_thread_num = config::doris_scanner_thread_pool_thread_num / 1;
         } else {
-            _max_thread_num =
-                    4 * (config::doris_scanner_thread_pool_thread_num / 
num_parallel_instances);
+            const int scale_arg = _is_file_scan_operator ? 1 : 4;
+            _max_thread_num = scale_arg * 
(config::doris_scanner_thread_pool_thread_num /
+                                           num_parallel_instances);
+
             // In some rare cases, user may set num_parallel_instances to 1 
handly to make many query could be executed
             // in parallel. We need to make sure the _max_thread_num is 
smaller than previous value.
             _max_thread_num =
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 449a5b1b470..681ee68bf38 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -107,7 +107,7 @@ public:
     ScannerContext(RuntimeState* state, VScanNode* parent, const 
TupleDescriptor* output_tuple_desc,
                    const RowDescriptor* output_row_descriptor,
                    const std::list<std::shared_ptr<ScannerDelegate>>& 
scanners, int64_t limit_,
-                   bool ignore_data_distribution,
+                   bool ignore_data_distribution, bool is_file_scan_operator,
                    pipeline::ScanLocalStateBase* local_state = nullptr);
 
     ~ScannerContext() override {
@@ -183,7 +183,8 @@ protected:
     ScannerContext(RuntimeState* state_, const TupleDescriptor* 
output_tuple_desc,
                    const RowDescriptor* output_row_descriptor,
                    const std::list<std::shared_ptr<ScannerDelegate>>& 
scanners_, int64_t limit_,
-                   bool ignore_data_distribution, 
pipeline::ScanLocalStateBase* local_state);
+                   bool ignore_data_distribution, bool is_file_scan_operator,
+                   pipeline::ScanLocalStateBase* local_state);
 
     /// Four criteria to determine whether to increase the parallelism of the 
scanners
     /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
@@ -234,6 +235,7 @@ protected:
     RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
     QueryThreadContext _query_thread_context;
     bool _ignore_data_distribution = false;
+    bool _is_file_scan_operator;
 
     // for scaling up the running scanners
     size_t _estimated_block_size = 0;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index a7842188feb..8348641e5ce 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -321,7 +321,7 @@ void VScanNode::_start_scanners(const 
std::list<std::shared_ptr<ScannerDelegate>
     } else {
         _scanner_ctx = ScannerContext::create_shared(_state, this, 
_output_tuple_desc,
                                                      
_output_row_descriptor.get(), scanners,
-                                                     limit(), false);
+                                                     limit(), false, true);
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to