This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d3339005710 [opt](multi-catalog) Optimize remote scan concurrency.
(#51415)
d3339005710 is described below
commit d333900571008bbceb27350e40df3b87790e26a9
Author: Qi Chen <[email protected]>
AuthorDate: Thu Jun 5 11:11:40 2025 +0800
[opt](multi-catalog) Optimize remote scan concurrency. (#51415)
### What problem does this PR solve?
Problem Summary:
### Release note
[opt] (multi-catalog) Optimize remote scan concurrency.
1. Use `ScannerScheduler::get_remote_scan_thread_num()` to replace
`config::doris_scanner_thread_pool_thread_num` when calculate max
scanners in the external table case.
2. Remove `parallel_scan_max_scanners_count` calculation logic.
---
be/src/pipeline/exec/file_scan_operator.cpp | 11 +++++------
be/src/vec/exec/scan/scanner_scheduler.cpp | 11 ++++++-----
2 files changed, 11 insertions(+), 11 deletions(-)
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index ef94e3f1c80..b2d51515e73 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -39,9 +39,9 @@ Status
FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
auto& p = _parent->cast<FileScanOperatorX>();
// There's only one scan range for each backend in batch split mode. Each
backend only starts up one ScanNode instance.
- uint32_t shard_num =
- std::min(config::doris_scanner_thread_pool_thread_num /
p.query_parallel_instance_num(),
- _max_scanners);
+ uint32_t shard_num =
std::min(vectorized::ScannerScheduler::get_remote_scan_thread_num() /
+ p.query_parallel_instance_num(),
+ _max_scanners);
shard_num = std::max(shard_num, 1U);
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
for (int i = 0; i < _max_scanners; ++i) {
@@ -65,9 +65,8 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
auto& p = _parent->cast<FileScanOperatorX>();
auto calc_max_scanners = [&](int parallel_instance_num) -> int {
- int max_scanners = config::doris_scanner_thread_pool_thread_num /
parallel_instance_num;
- max_scanners =
- std::max(std::max(max_scanners,
state->parallel_scan_max_scanners_count()), 1);
+ int max_scanners =
+ vectorized::ScannerScheduler::get_remote_scan_thread_num() /
parallel_instance_num;
if (should_run_serial()) {
max_scanners = 1;
}
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index c0dd6ecaa8e..c2e84009542 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -402,11 +402,12 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}
int ScannerScheduler::get_remote_scan_thread_num() {
- int remote_max_thread_num =
config::doris_max_remote_scanner_thread_pool_thread_num != -1
- ?
config::doris_max_remote_scanner_thread_pool_thread_num
- : std::max(512, CpuInfo::num_cores() *
10);
- remote_max_thread_num =
- std::max(remote_max_thread_num,
config::doris_scanner_thread_pool_thread_num);
+ static int remote_max_thread_num = []() {
+ int num = config::doris_max_remote_scanner_thread_pool_thread_num != -1
+ ?
config::doris_max_remote_scanner_thread_pool_thread_num
+ : std::max(512, CpuInfo::num_cores() * 10);
+ return std::max(num, config::doris_scanner_thread_pool_thread_num);
+ }();
return remote_max_thread_num;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]