This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 092a394782 [improvement](agg)limit the output of agg node (#11461)
092a394782 is described below
commit 092a3947822b316d3e84ee7681f53a9d03044b75
Author: starocean999 <[email protected]>
AuthorDate: Fri Aug 5 07:53:55 2022 +0800
[improvement](agg)limit the output of agg node (#11461)
* [improvement](agg)limit the output of agg node
---
.../vec/aggregate_functions/aggregate_function.h | 31 +++++
be/src/vec/common/columns_hashing.h | 18 +++
be/src/vec/common/columns_hashing_impl.h | 39 ++++++
be/src/vec/common/hash_table/hash_map.h | 2 +
be/src/vec/common/hash_table/ph_hash_map.h | 14 ++
be/src/vec/exec/vaggregation_node.cpp | 144 +++++++++++----------
be/src/vec/exec/vaggregation_node.h | 138 ++++++++++++++++++++
be/src/vec/exprs/vectorized_agg_fn.cpp | 7 +
be/src/vec/exprs/vectorized_agg_fn.h | 3 +
9 files changed, 326 insertions(+), 70 deletions(-)
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h
b/be/src/vec/aggregate_functions/aggregate_function.h
index 3e20a4db51..30b2e2c959 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -94,6 +94,11 @@ public:
virtual void merge_vec(const AggregateDataPtr* places, size_t offset,
ConstAggregateDataPtr rhs,
Arena* arena, const size_t num_rows) const = 0;
+ // same as merge_vec, but only call "merge" function when place is not
nullptr
+ virtual void merge_vec_selected(const AggregateDataPtr* places, size_t
offset,
+ ConstAggregateDataPtr rhs, Arena* arena,
+ const size_t num_rows) const = 0;
+
/// Serializes state (to transmit it over the network, for example).
virtual void serialize(ConstAggregateDataPtr __restrict place,
BufferWritable& buf) const = 0;
@@ -132,6 +137,11 @@ public:
virtual void add_batch(size_t batch_size, AggregateDataPtr* places, size_t
place_offset,
const IColumn** columns, Arena* arena) const = 0;
+ // same as add_batch, but only call "add" function when place is not
nullptr
+ virtual void add_batch_selected(size_t batch_size, AggregateDataPtr*
places,
+ size_t place_offset, const IColumn**
columns,
+ Arena* arena) const = 0;
+
/** The same for single place.
*/
virtual void add_batch_single_place(size_t batch_size, AggregateDataPtr
place,
@@ -169,6 +179,15 @@ public:
}
}
+ void add_batch_selected(size_t batch_size, AggregateDataPtr* places,
size_t place_offset,
+ const IColumn** columns, Arena* arena) const
override {
+ for (size_t i = 0; i < batch_size; ++i) {
+ if (places[i]) {
+ static_cast<const Derived*>(this)->add(places[i] +
place_offset, columns, i, arena);
+ }
+ }
+ }
+
void add_batch_single_place(size_t batch_size, AggregateDataPtr place,
const IColumn** columns,
Arena* arena) const override {
for (size_t i = 0; i < batch_size; ++i) {
@@ -228,6 +247,18 @@ public:
arena);
}
}
+
+ void merge_vec_selected(const AggregateDataPtr* places, size_t offset,
+ ConstAggregateDataPtr rhs, Arena* arena,
+ const size_t num_rows) const override {
+ const auto size_of_data = static_cast<const
Derived*>(this)->size_of_data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ if (places[i]) {
+ static_cast<const Derived*>(this)->merge(places[i] + offset,
rhs + size_of_data * i,
+ arena);
+ }
+ }
+ }
};
/// Implements several methods for manipulation with data. T - type of
structure with data for aggregation.
diff --git a/be/src/vec/common/columns_hashing.h
b/be/src/vec/common/columns_hashing.h
index 580588c480..844fc26ca7 100644
--- a/be/src/vec/common/columns_hashing.h
+++ b/be/src/vec/common/columns_hashing.h
@@ -257,6 +257,24 @@ struct HashMethodSingleLowNullableColumn : public
SingleColumnMethod {
} else
return EmplaceResult(inserted);
}
+
+ template <typename Data>
+ ALWAYS_INLINE FindResult find_key(Data& data, size_t row, Arena& pool) {
+ if (key_columns[0]->is_null_at(row)) {
+ bool has_null_key = data.has_null_key_data();
+ if constexpr (has_mapped)
+ return FindResult(&data.get_null_key_data(), has_null_key);
+ else
+ return FindResult(has_null_key);
+ }
+ auto key_holder = Base::get_key_holder(row, pool);
+ auto key = key_holder_get_key(key_holder);
+ auto it = data.find(key);
+ if constexpr (has_mapped)
+ return FindResult(it ? lookup_result_get_mapped(it) : nullptr, it
!= nullptr);
+ else
+ return FindResult(it != nullptr);
+ }
};
} // namespace ColumnsHashing
diff --git a/be/src/vec/common/columns_hashing_impl.h
b/be/src/vec/common/columns_hashing_impl.h
index 2abfa3b8e8..5b5f13b86a 100644
--- a/be/src/vec/common/columns_hashing_impl.h
+++ b/be/src/vec/common/columns_hashing_impl.h
@@ -146,6 +146,12 @@ public:
return find_key_impl(key_holder_get_key(key_holder), data);
}
+ template <typename Data>
+ ALWAYS_INLINE FindResult find_key(Data& data, size_t hash_value, size_t
row, Arena& pool) {
+ auto key_holder = static_cast<Derived&>(*this).get_key_holder(row,
pool);
+ return find_key_impl(key_holder_get_key(key_holder), hash_value, data);
+ }
+
template <typename Data>
ALWAYS_INLINE size_t get_hash(const Data& data, size_t row, Arena& pool) {
auto key_holder = static_cast<Derived&>(*this).get_key_holder(row,
pool);
@@ -290,6 +296,39 @@ protected:
else
return FindResult(it != nullptr);
}
+
+ template <typename Data, typename Key>
+ ALWAYS_INLINE FindResult find_key_impl(Key key, size_t hash_value, Data&
data) {
+ if constexpr (Cache::consecutive_keys_optimization) {
+ if (cache.check(key)) {
+ if constexpr (has_mapped)
+ return FindResult(&cache.value.second, cache.found);
+ else
+ return FindResult(cache.found);
+ }
+ }
+
+ auto it = data.find(key, hash_value);
+
+ if constexpr (consecutive_keys_optimization) {
+ cache.found = it != nullptr;
+ cache.empty = false;
+
+ if constexpr (has_mapped) {
+ cache.value.first = key;
+ if (it) {
+ cache.value.second = *lookup_result_get_mapped(it);
+ }
+ } else {
+ cache.value = key;
+ }
+ }
+
+ if constexpr (has_mapped)
+ return FindResult(it ? lookup_result_get_mapped(it) : nullptr, it
!= nullptr);
+ else
+ return FindResult(it != nullptr);
+ }
};
template <typename T>
diff --git a/be/src/vec/common/hash_table/hash_map.h
b/be/src/vec/common/hash_table/hash_map.h
index 0b849c13fd..25e1b74c51 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -68,6 +68,8 @@ struct HashMapCell {
const value_type& get_value() const { return value; }
static const Key& get_key(const value_type& value) { return value.first; }
+ Mapped& get_mapped() { return value.second; }
+ const Mapped& get_mapped() const { return value.second; }
bool key_equals(const Key& key_) const { return value.first == key_; }
bool key_equals(const Key& key_, size_t /*hash_*/) const { return
value.first == key_; }
diff --git a/be/src/vec/common/hash_table/ph_hash_map.h
b/be/src/vec/common/hash_table/ph_hash_map.h
index 5f06f6a09f..3a56ce7988 100644
--- a/be/src/vec/common/hash_table/ph_hash_map.h
+++ b/be/src/vec/common/hash_table/ph_hash_map.h
@@ -140,6 +140,20 @@ public:
}
}
+ template <typename KeyHolder>
+ LookupResult ALWAYS_INLINE find(KeyHolder&& key_holder) {
+ const auto& key = key_holder_get_key(key_holder);
+ auto it = _hash_map.find(key);
+ return it != _hash_map.end() ? &*it : nullptr;
+ }
+
+ template <typename KeyHolder>
+ LookupResult ALWAYS_INLINE find(KeyHolder&& key_holder, size_t hash_value)
{
+ const auto& key = key_holder_get_key(key_holder);
+ auto it = _hash_map.find(key, hash_value);
+ return it != _hash_map.end() ? &*it : nullptr;
+ }
+
size_t hash(const Key& x) const { return _hash_map.hash(x); }
void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) {
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 80817a0095..1bdccbd614 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -27,7 +27,6 @@
#include "vec/data_types/data_type_string.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
-#include "vec/exprs/vslot_ref.h"
#include "vec/utils/util.hpp"
namespace doris::vectorized {
@@ -393,6 +392,10 @@ Status AggregationNode::prepare(RuntimeState* state) {
_executor.update_memusage =
std::bind<void>(&AggregationNode::_update_memusage_with_serialized_key, this);
_executor.close =
std::bind<void>(&AggregationNode::_close_with_serialized_key, this);
+
+ _should_limit_output = _limit != -1 && // has limit
+ !_vconjunct_ctx_ptr && // no having conjunct
+ _needs_finalize; // agg's finalize step
}
return Status::OK();
@@ -704,6 +707,11 @@ bool AggregationNode::_should_expand_preagg_hash_tables() {
_agg_data._aggregated_method_variant);
}
+size_t AggregationNode::_get_hash_table_size() {
+ return std::visit([&](auto&& agg_method) { return agg_method.data.size();
},
+ _agg_data._aggregated_method_variant);
+}
+
void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places,
ColumnRawPtrs& key_columns,
const size_t num_rows) {
std::visit(
@@ -774,6 +782,63 @@ void
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
_agg_data._aggregated_method_variant);
}
+void AggregationNode::_find_in_hash_table(AggregateDataPtr* places,
ColumnRawPtrs& key_columns,
+ size_t rows) {
+ std::visit(
+ [&](auto&& agg_method) -> void {
+ using HashMethodType = std::decay_t<decltype(agg_method)>;
+ using HashTableType = std::decay_t<decltype(agg_method.data)>;
+ using AggState = typename HashMethodType::State;
+ AggState state(key_columns, _probe_key_sz, nullptr);
+
+ _pre_serialize_key_if_need(state, agg_method, key_columns,
rows);
+
+ std::vector<size_t> hash_values;
+
+ if constexpr (HashTableTraits<HashTableType>::is_phmap) {
+ if (hash_values.size() < rows) hash_values.resize(rows);
+ if constexpr
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
+ AggState>::value) {
+ for (size_t i = 0; i < rows; ++i) {
+ hash_values[i] =
agg_method.data.hash(agg_method.keys[i]);
+ }
+ } else {
+ for (size_t i = 0; i < rows; ++i) {
+ hash_values[i] =
+
agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
+ }
+ }
+ }
+
+ /// For all rows.
+ for (size_t i = 0; i < rows; ++i) {
+ auto find_result = [&]() {
+ if constexpr
(HashTableTraits<HashTableType>::is_phmap) {
+ if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
+ if constexpr
(HashTableTraits<HashTableType>::is_parallel_phmap) {
+
agg_method.data.prefetch_by_key(state.get_key_holder(
+ i + HASH_MAP_PREFETCH_DIST,
_agg_arena_pool));
+ } else
+ agg_method.data.prefetch_by_hash(
+ hash_values[i +
HASH_MAP_PREFETCH_DIST]);
+ }
+
+ return state.find_key(agg_method.data,
hash_values[i], i,
+ _agg_arena_pool);
+ } else {
+ return state.find_key(agg_method.data, i,
_agg_arena_pool);
+ }
+ }();
+
+ if (find_result.is_found()) {
+ places[i] = find_result.get_mapped();
+ } else
+ places[i] = nullptr;
+ }
+ },
+ _agg_data._aggregated_method_variant);
+}
+
Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block*
in_block,
doris::vectorized::Block*
out_block) {
SCOPED_TIMER(_build_timer);
@@ -896,34 +961,11 @@ Status
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
}
Status AggregationNode::_execute_with_serialized_key(Block* block) {
- SCOPED_TIMER(_build_timer);
- DCHECK(!_probe_expr_ctxs.empty());
-
- size_t key_size = _probe_expr_ctxs.size();
- ColumnRawPtrs key_columns(key_size);
- {
- SCOPED_TIMER(_expr_timer);
- for (size_t i = 0; i < key_size; ++i) {
- int result_column_id = -1;
- RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(block,
&result_column_id));
- block->get_by_position(result_column_id).column =
- block->get_by_position(result_column_id)
- .column->convert_to_full_column_if_const();
- key_columns[i] =
block->get_by_position(result_column_id).column.get();
- }
- }
-
- int rows = block->rows();
- PODArray<AggregateDataPtr> places(rows);
-
- _emplace_into_hash_table(places.data(), key_columns, rows);
-
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->execute_batch_add(block,
_offsets_of_aggregate_states[i],
- places.data(),
&_agg_arena_pool);
+ if (_reach_limit) {
+ return _execute_with_serialized_key_helper<true>(block);
+ } else {
+ return _execute_with_serialized_key_helper<false>(block);
}
-
- return Status::OK();
}
Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state,
Block* block,
@@ -1111,49 +1153,11 @@ Status
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
}
Status AggregationNode::_merge_with_serialized_key(Block* block) {
- SCOPED_TIMER(_merge_timer);
-
- size_t key_size = _probe_expr_ctxs.size();
- ColumnRawPtrs key_columns(key_size);
-
- for (size_t i = 0; i < key_size; ++i) {
- int result_column_id = -1;
- RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(block,
&result_column_id));
- key_columns[i] = block->get_by_position(result_column_id).column.get();
- }
-
- int rows = block->rows();
- PODArray<AggregateDataPtr> places(rows);
-
- _emplace_into_hash_table(places.data(), key_columns, rows);
-
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
-
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref());
- int col_id =
-
((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())->column_id();
- if (_aggregate_evaluators[i]->is_merge()) {
- auto column = block->get_by_position(col_id).column;
- if (column->is_nullable()) {
- column =
((ColumnNullable*)column.get())->get_nested_column_ptr();
- }
-
- std::unique_ptr<char[]> deserialize_buffer(
- new
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
-
-
_aggregate_evaluators[i]->function()->deserialize_vec(deserialize_buffer.get(),
-
(ColumnString*)(column.get()),
-
&_agg_arena_pool, rows);
- _aggregate_evaluators[i]->function()->merge_vec(
- places.data(), _offsets_of_aggregate_states[i],
deserialize_buffer.get(),
- &_agg_arena_pool, rows);
-
- } else {
- _aggregate_evaluators[i]->execute_batch_add(block,
_offsets_of_aggregate_states[i],
- places.data(),
&_agg_arena_pool);
- }
+ if (_reach_limit) {
+ return _merge_with_serialized_key_helper<true>(block);
+ } else {
+ return _merge_with_serialized_key_helper<false>(block);
}
- return Status::OK();
}
void AggregationNode::_update_memusage_with_serialized_key() {
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index ba7cf24b70..7702102e6a 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -26,6 +26,7 @@
#include "vec/common/columns_hashing.h"
#include "vec/common/hash_table/fixed_hash_map.h"
#include "vec/exprs/vectorized_agg_fn.h"
+#include "vec/exprs/vslot_ref.h"
namespace doris {
class TPlanNode;
@@ -674,11 +675,16 @@ private:
bool _should_expand_hash_table = true;
std::vector<char*> _streaming_pre_places;
+ bool _should_limit_output = false;
+ bool _reach_limit = false;
+
private:
/// Return true if we should keep expanding hash tables in the preagg. If
false,
/// the preagg should pass through any rows it can't fit in its tables.
bool _should_expand_preagg_hash_tables();
+ size_t _get_hash_table_size();
+
void _make_nullable_output_key(Block* block);
Status _create_agg_status(AggregateDataPtr data);
@@ -710,9 +716,141 @@ private:
}
}
+ template <bool limit>
+ Status _execute_with_serialized_key_helper(Block* block) {
+ SCOPED_TIMER(_build_timer);
+ DCHECK(!_probe_expr_ctxs.empty());
+
+ size_t key_size = _probe_expr_ctxs.size();
+ ColumnRawPtrs key_columns(key_size);
+ {
+ SCOPED_TIMER(_expr_timer);
+ for (size_t i = 0; i < key_size; ++i) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(block,
&result_column_id));
+ block->get_by_position(result_column_id).column =
+ block->get_by_position(result_column_id)
+ .column->convert_to_full_column_if_const();
+ key_columns[i] =
block->get_by_position(result_column_id).column.get();
+ }
+ }
+
+ int rows = block->rows();
+ PODArray<AggregateDataPtr> places(rows);
+
+ if constexpr (limit) {
+ _find_in_hash_table(places.data(), key_columns, rows);
+
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ _aggregate_evaluators[i]->execute_batch_add_selected(
+ block, _offsets_of_aggregate_states[i], places.data(),
&_agg_arena_pool);
+ }
+ } else {
+ _emplace_into_hash_table(places.data(), key_columns, rows);
+
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ _aggregate_evaluators[i]->execute_batch_add(block,
_offsets_of_aggregate_states[i],
+ places.data(),
&_agg_arena_pool);
+ }
+
+ if (_should_limit_output) {
+ _reach_limit = _get_hash_table_size() >= _limit;
+ }
+ }
+
+ return Status::OK();
+ }
+
+ template <bool limit>
+ Status _merge_with_serialized_key_helper(Block* block) {
+ SCOPED_TIMER(_merge_timer);
+
+ size_t key_size = _probe_expr_ctxs.size();
+ ColumnRawPtrs key_columns(key_size);
+
+ for (size_t i = 0; i < key_size; ++i) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(block,
&result_column_id));
+ key_columns[i] =
block->get_by_position(result_column_id).column.get();
+ }
+
+ int rows = block->rows();
+ PODArray<AggregateDataPtr> places(rows);
+
+ if constexpr (limit) {
+ _find_in_hash_table(places.data(), key_columns, rows);
+
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() ==
1 &&
+
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref());
+ int col_id =
((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())
+ ->column_id();
+ if (_aggregate_evaluators[i]->is_merge()) {
+ auto column = block->get_by_position(col_id).column;
+ if (column->is_nullable()) {
+ column =
((ColumnNullable*)column.get())->get_nested_column_ptr();
+ }
+
+ std::unique_ptr<char[]> deserialize_buffer(
+ new
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
+
+ _aggregate_evaluators[i]->function()->deserialize_vec(
+ deserialize_buffer.get(),
(ColumnString*)(column.get()),
+ &_agg_arena_pool, rows);
+ _aggregate_evaluators[i]->function()->merge_vec_selected(
+ places.data(), _offsets_of_aggregate_states[i],
+ deserialize_buffer.get(), &_agg_arena_pool, rows);
+
+ } else {
+ _aggregate_evaluators[i]->execute_batch_add_selected(
+ block, _offsets_of_aggregate_states[i],
places.data(),
+ &_agg_arena_pool);
+ }
+ }
+ } else {
+ _emplace_into_hash_table(places.data(), key_columns, rows);
+
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() ==
1 &&
+
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref());
+ int col_id =
((VSlotRef*)_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root())
+ ->column_id();
+ if (_aggregate_evaluators[i]->is_merge()) {
+ auto column = block->get_by_position(col_id).column;
+ if (column->is_nullable()) {
+ column =
((ColumnNullable*)column.get())->get_nested_column_ptr();
+ }
+
+ std::unique_ptr<char[]> deserialize_buffer(
+ new
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
+
+ _aggregate_evaluators[i]->function()->deserialize_vec(
+ deserialize_buffer.get(),
(ColumnString*)(column.get()),
+ &_agg_arena_pool, rows);
+ _aggregate_evaluators[i]->function()->merge_vec(
+ places.data(), _offsets_of_aggregate_states[i],
+ deserialize_buffer.get(), &_agg_arena_pool, rows);
+
+ } else {
+ _aggregate_evaluators[i]->execute_batch_add(block,
+
_offsets_of_aggregate_states[i],
+ places.data(),
&_agg_arena_pool);
+ }
+ }
+
+ if (_should_limit_output) {
+ _reach_limit = _get_hash_table_size() >= _limit;
+ }
+ }
+
+ return Status::OK();
+ }
+
void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs&
key_columns,
const size_t num_rows);
+ void _find_in_hash_table(AggregateDataPtr* places, ColumnRawPtrs&
key_columns, size_t num_rows);
+
void release_tracker();
using vectorized_execute = std::function<Status(Block* block)>;
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp
b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 527cb40e18..1cd7198ec0 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -156,6 +156,13 @@ void AggFnEvaluator::execute_batch_add(Block* block,
size_t offset, AggregateDat
_function->add_batch(block->rows(), places, offset, _agg_columns.data(),
arena);
}
+void AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset,
+ AggregateDataPtr* places,
Arena* arena) {
+ _calc_argment_columns(block);
+ SCOPED_TIMER(_exec_timer);
+ _function->add_batch_selected(block->rows(), places, offset,
_agg_columns.data(), arena);
+}
+
void AggFnEvaluator::insert_result_info(AggregateDataPtr place, IColumn*
column) {
_function->insert_result_into(place, *column);
}
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h
b/be/src/vec/exprs/vectorized_agg_fn.h
index 52098f0d8f..e9d86b4e28 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -58,6 +58,9 @@ public:
void execute_batch_add(Block* block, size_t offset, AggregateDataPtr*
places,
Arena* arena = nullptr);
+ void execute_batch_add_selected(Block* block, size_t offset,
AggregateDataPtr* places,
+ Arena* arena = nullptr);
+
void insert_result_info(AggregateDataPtr place, IColumn* column);
void insert_result_info_vec(const std::vector<AggregateDataPtr>& place,
size_t offset,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]