This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch ckb2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/ckb2 by this push:
new ffb91ec5f33 use lazy_emplace_batch in simple count (#61490)
ffb91ec5f33 is described below
commit ffb91ec5f336a4afb131c0b2fe5da529814975f0
Author: Mryange <[email protected]>
AuthorDate: Thu Mar 19 10:11:01 2026 +0800
use lazy_emplace_batch in simple count (#61490)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/exec/common/agg_utils.h | 5 -----
be/src/exec/operator/aggregation_sink_operator.cpp | 21 ++++++++++-----------
.../operator/streaming_aggregation_operator.cpp | 13 ++++++-------
3 files changed, 16 insertions(+), 23 deletions(-)
diff --git a/be/src/exec/common/agg_utils.h b/be/src/exec/common/agg_utils.h
index b613a68a5ab..cba7ee62a76 100644
--- a/be/src/exec/common/agg_utils.h
+++ b/be/src/exec/common/agg_utils.h
@@ -99,7 +99,6 @@ struct AggDataVariantsBase : public
DataVariants<MethodVariants, MethodSingleNul
MethodOneNumber,
DataWithNullKey> {
void init_agg_data(const std::vector<DataTypePtr>& data_types, HashKeyType
type) {
bool nullable = data_types.size() == 1 && data_types[0]->is_nullable();
-
switch (type) {
case HashKeyType::without_key:
break;
@@ -177,11 +176,7 @@ struct AggregatedDataVariants
AggregatedDataWithNullableShortStringKey>
{
AggregatedDataWithoutKey without_key = nullptr;
- bool is_fixed_key = true;
-
void init(const std::vector<DataTypePtr>& data_types, HashKeyType type) {
- is_fixed_key = !(type == HashKeyType::without_key || type ==
HashKeyType::EMPTY ||
- type == HashKeyType::serialized || type ==
HashKeyType::string_key);
this->init_agg_data(data_types, type);
}
};
diff --git a/be/src/exec/operator/aggregation_sink_operator.cpp
b/be/src/exec/operator/aggregation_sink_operator.cpp
index 43b9364a65d..21547b91da9 100644
--- a/be/src/exec/operator/aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/aggregation_sink_operator.cpp
@@ -174,7 +174,7 @@ Status AggSinkLocalState::open(RuntimeState* state) {
p._aggregate_evaluators[0]->function()->is_simple_count()) /* only
one count(*) */
&& !_should_limit_output /* no limit optimization */ &&
!Base::_shared_state->enable_spill /* spill not enabled */) {
- _shared_state->use_simple_count =
_shared_state->agg_data->is_fixed_key;
+ _shared_state->use_simple_count = true;
#ifndef NDEBUG
// Randomly enable/disable in debug mode to verify correctness of
multi-phase agg promotion/demotion.
_shared_state->use_simple_count = rand() % 2 == 0;
@@ -639,11 +639,10 @@ void
AggSinkLocalState::_emplace_into_hash_table_inline_count(ColumnRawPtrs& key
auto creator_for_null_key = [&](auto& mapped) {
mapped = nullptr; };
SCOPED_TIMER(_hash_table_emplace_timer);
- for (size_t i = 0; i < num_rows; ++i) {
- auto* mapped_ptr =
agg_method.lazy_emplace(state, i, creator,
-
creator_for_null_key);
- ++reinterpret_cast<UInt64&>(*mapped_ptr);
- }
+ lazy_emplace_batch(agg_method, state, num_rows,
creator,
+ creator_for_null_key,
[&](uint32_t, auto& mapped) {
+
++reinterpret_cast<UInt64&>(mapped);
+ });
COUNTER_UPDATE(_hash_table_input_counter,
num_rows);
}},
@@ -680,11 +679,11 @@ void
AggSinkLocalState::_merge_into_hash_table_inline_count(ColumnRawPtrs& key_c
auto creator_for_null_key = [&](auto& mapped) {
mapped = nullptr; };
SCOPED_TIMER(_hash_table_emplace_timer);
- for (size_t i = 0; i < num_rows; ++i) {
- auto* mapped_ptr =
agg_method.lazy_emplace(state, i, creator,
-
creator_for_null_key);
- reinterpret_cast<UInt64&>(*mapped_ptr) +=
col_data[i].count;
- }
+ lazy_emplace_batch(
+ agg_method, state, num_rows, creator,
creator_for_null_key,
+ [&](uint32_t i, auto& mapped) {
+ reinterpret_cast<UInt64&>(mapped) +=
col_data[i].count;
+ });
COUNTER_UPDATE(_hash_table_input_counter,
num_rows);
}},
diff --git a/be/src/exec/operator/streaming_aggregation_operator.cpp
b/be/src/exec/operator/streaming_aggregation_operator.cpp
index eab1f691ab1..48a32958ec6 100644
--- a/be/src/exec/operator/streaming_aggregation_operator.cpp
+++ b/be/src/exec/operator/streaming_aggregation_operator.cpp
@@ -155,8 +155,8 @@ Status StreamingAggLocalState::open(RuntimeState* state) {
// StreamingAgg only operates in update + serialize mode: input is raw
data, output is serialized intermediate state.
// The serialization format of count is UInt64 itself, so it can be
inlined into the hash table mapped slot.
if (_aggregate_evaluators.size() == 1 &&
- _aggregate_evaluators[0]->function()->is_simple_count()) {
- _use_simple_count = _agg_data->is_fixed_key;
+ _aggregate_evaluators[0]->function()->is_simple_count() && limit ==
-1) {
+ _use_simple_count = true;
#ifndef NDEBUG
// Randomly enable/disable in debug mode to verify correctness of
multi-phase agg promotion/demotion.
_use_simple_count = rand() % 2 == 0;
@@ -944,11 +944,10 @@ void
StreamingAggLocalState::_emplace_into_hash_table_inline_count(ColumnRawPtrs
auto creator_for_null_key = [&](auto& mapped) {
mapped = nullptr; };
SCOPED_TIMER(_hash_table_emplace_timer);
- for (size_t i = 0; i < num_rows; ++i) {
- auto* mapped_ptr =
agg_method.lazy_emplace(state, i, creator,
-
creator_for_null_key);
- ++reinterpret_cast<UInt64&>(*mapped_ptr);
- }
+ lazy_emplace_batch(agg_method, state, num_rows,
creator,
+ creator_for_null_key,
[&](uint32_t, auto& mapped) {
+
++reinterpret_cast<UInt64&>(mapped);
+ });
COUNTER_UPDATE(_hash_table_input_counter,
num_rows);
}},
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]