This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new c2fa60cbe52 [Enchancement](scan) enable parallel scan when preagg is
on (#36302)
c2fa60cbe52 is described below
commit c2fa60cbe52b9ebdf26fbef1f6959519e6a86203
Author: Pxl <[email protected]>
AuthorDate: Fri Jun 14 23:44:41 2024 +0800
[Enchancement](scan) enable parallel scan when preagg is on (#36302)
## Proposed changes
pick from #35810
---
be/src/pipeline/exec/olap_scan_operator.cpp | 69 +++++++++++++----------------
1 file changed, 30 insertions(+), 39 deletions(-)
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index c8a9aa3f85d..00650b8a976 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -265,18 +265,11 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
state()->query_options().resource_limit.__isset.cpu_limit;
if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
- p._push_down_agg_type == TPushAggOp::NONE) {
+ p._push_down_agg_type == TPushAggOp::NONE &&
+ (_storage_no_merge() || p._olap_scan_node.is_preaggregation)) {
std::vector<TabletWithVersion> tablets;
- bool is_dup_mow_key = true;
for (auto&& scan_range : _scan_ranges) {
auto tablet =
DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
- is_dup_mow_key =
- tablet->keys_type() == DUP_KEYS || (tablet->keys_type() ==
UNIQUE_KEYS &&
-
tablet->enable_unique_key_merge_on_write());
- if (!is_dup_mow_key) {
- break;
- }
-
int64_t version = 0;
std::from_chars(scan_range->version.data(),
scan_range->version.data() +
scan_range->version.size(), version);
@@ -284,42 +277,40 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
TabletWithVersion
{std::dynamic_pointer_cast<Tablet>(tablet), version});
}
- if (is_dup_mow_key) {
- std::vector<OlapScanRange*> key_ranges;
- for (auto& range : _cond_ranges) {
- if (range->begin_scan_range.size() == 1 &&
- range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY)
{
- continue;
- }
- key_ranges.emplace_back(range.get());
+ std::vector<OlapScanRange*> key_ranges;
+ for (auto& range : _cond_ranges) {
+ if (range->begin_scan_range.size() == 1 &&
+ range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) {
+ continue;
}
+ key_ranges.emplace_back(range.get());
+ }
- ParallelScannerBuilder<OlapScanLocalState> scanner_builder(
- this, tablets, _scanner_profile, key_ranges, state(),
p._limit_per_scanner,
- is_dup_mow_key, p._olap_scan_node.is_preaggregation);
+ ParallelScannerBuilder<OlapScanLocalState> scanner_builder(
+ this, tablets, _scanner_profile, key_ranges, state(),
p._limit_per_scanner, true,
+ p._olap_scan_node.is_preaggregation);
- int max_scanners_count =
state()->parallel_scan_max_scanners_count();
+ int max_scanners_count = state()->parallel_scan_max_scanners_count();
- // If the `max_scanners_count` was not set,
- // use `config::doris_scanner_thread_pool_thread_num` as the
default value.
- if (max_scanners_count <= 0) {
- max_scanners_count =
config::doris_scanner_thread_pool_thread_num;
- }
+ // If the `max_scanners_count` was not set,
+ // use `config::doris_scanner_thread_pool_thread_num` as the default
value.
+ if (max_scanners_count <= 0) {
+ max_scanners_count = config::doris_scanner_thread_pool_thread_num;
+ }
- // Too small value of `min_rows_per_scanner` is meaningless.
- auto min_rows_per_scanner =
- std::max<int64_t>(1024,
state()->parallel_scan_min_rows_per_scanner());
- scanner_builder.set_max_scanners_count(max_scanners_count);
- scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner);
-
- RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners));
- for (auto& scanner : *scanners) {
- auto* olap_scanner =
assert_cast<vectorized::NewOlapScanner*>(scanner.get());
- RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts));
- olap_scanner->set_compound_filters(_compound_filters);
- }
- return Status::OK();
+ // Too small value of `min_rows_per_scanner` is meaningless.
+ auto min_rows_per_scanner =
+ std::max<int64_t>(1024,
state()->parallel_scan_min_rows_per_scanner());
+ scanner_builder.set_max_scanners_count(max_scanners_count);
+ scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner);
+
+ RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners));
+ for (auto& scanner : *scanners) {
+ auto* olap_scanner =
assert_cast<vectorized::NewOlapScanner*>(scanner.get());
+ RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts));
+ olap_scanner->set_compound_filters(_compound_filters);
}
+ return Status::OK();
}
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]