This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch opt_perf
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/opt_perf by this push:
new c5ec7601d4 [Opt](Vectorized) Support push down no grouping agg (#12881)
c5ec7601d4 is described below
commit c5ec7601d4a45493051292c7234997c622a46f36
Author: HappenLee <[email protected]>
AuthorDate: Thu Sep 22 19:46:21 2022 +0800
[Opt](Vectorized) Support push down no grouping agg (#12881)
---
be/src/olap/iterators.h | 1 +
be/src/olap/reader.h | 1 +
be/src/olap/rowset/beta_rowset_reader.cpp | 1 +
be/src/olap/rowset/rowset_reader_context.h | 1 +
be/src/olap/rowset/segment_v2/column_reader.cpp | 42 ++++++
be/src/olap/rowset/segment_v2/column_reader.h | 12 ++
be/src/olap/rowset/segment_v2/segment.cpp | 8 +-
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 1 -
be/src/olap/rowset/segment_v2/segment_iterator.h | 4 +-
be/src/vec/exec/scan/new_olap_scanner.cpp | 9 +-
be/src/vec/exec/volap_scanner.cpp | 8 +-
be/src/vec/olap/block_reader.cpp | 1 +
be/src/vec/olap/vgeneric_iterators.cpp | 79 ++++++++++
be/src/vec/olap/vgeneric_iterators.h | 6 +
be/test/vec/exec/vgeneric_iterators_test.cpp | 1 -
.../org/apache/doris/catalog/PrimitiveType.java | 4 +
.../org/apache/doris/planner/OlapScanNode.java | 11 ++
.../apache/doris/planner/SingleNodePlanner.java | 160 +++++++++++++++++++++
.../java/org/apache/doris/qe/SessionVariable.java | 13 +-
gensrc/thrift/PlanNodes.thrift | 8 ++
20 files changed, 359 insertions(+), 12 deletions(-)
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 22f081d0eb..4f12118c2c 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -77,6 +77,7 @@ public:
std::vector<ColumnPredicate*> column_predicates;
std::unordered_map<int32_t, std::shared_ptr<AndBlockColumnPredicate>>
col_id_to_predicates;
std::unordered_map<int32_t, std::vector<const ColumnPredicate*>>
col_id_to_del_predicates;
+ TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
// REQUIRED (null is not allowed)
OlapReaderStatistics* stats = nullptr;
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 004e75c773..ae476e4fa2 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -91,6 +91,7 @@ public:
// use only in vec exec engine
std::vector<uint32_t>* origin_return_columns = nullptr;
std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set =
nullptr;
+ TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
// used for comapction to record row ids
bool record_rowids = false;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index df15b72f62..87893927d5 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -49,6 +49,7 @@ Status BetaRowsetReader::init(RowsetReaderContext*
read_context) {
// convert RowsetReaderContext to StorageReadOptions
StorageReadOptions read_options;
read_options.stats = _stats;
+ read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt;
if (read_context->lower_bound_keys != nullptr) {
for (int i = 0; i < read_context->lower_bound_keys->size(); ++i) {
read_options.key_ranges.emplace_back(&read_context->lower_bound_keys->at(i),
diff --git a/be/src/olap/rowset/rowset_reader_context.h
b/be/src/olap/rowset/rowset_reader_context.h
index de61117426..ce2fd4b721 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -41,6 +41,7 @@ struct RowsetReaderContext {
std::vector<uint32_t>* read_orderby_key_columns = nullptr;
// projection columns: the set of columns rowset reader should return
const std::vector<uint32_t>* return_columns = nullptr;
+ TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
// column name -> column predicate
// adding column_name for predicate to make use of column selectivity
const std::vector<ColumnPredicate*>* predicates = nullptr;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index d42358c5e7..451b8f3e91 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -171,6 +171,44 @@ Status ColumnReader::get_row_ranges_by_zone_map(
return Status::OK();
}
+Status ColumnReader::next_batch_of_zone_map(size_t* n,
vectorized::MutableColumnPtr& dst) const {
+ // TODO: this work to get min/max value seems should only do once
+ FieldType type = _type_info->type();
+ std::unique_ptr<WrapperField> min_value(WrapperField::create_by_type(type,
_meta.length()));
+ std::unique_ptr<WrapperField> max_value(WrapperField::create_by_type(type,
_meta.length()));
+ _parse_zone_map(_zone_map_index_meta->segment_zone_map(), min_value.get(),
max_value.get());
+
+ dst->reserve(*n);
+ bool is_string = is_olap_string_type(type);
+ if (max_value->is_null()) {
+ assert_cast<vectorized::ColumnNullable&>(*dst).insert_default();
+ } else {
+ if (is_string) {
+ auto sv = (StringValue*)max_value->cell_ptr();
+ dst->insert_data(sv->ptr, sv->len);
+ } else {
+ dst->insert_many_fix_len_data(static_cast<const
char*>(max_value->cell_ptr()), 1);
+ }
+ }
+
+ auto size = *n - 1;
+ if (min_value->is_null()) {
+
assert_cast<vectorized::ColumnNullable&>(*dst).insert_null_elements(size);
+ } else {
+ if (is_string) {
+ auto sv = (StringValue*)min_value->cell_ptr();
+ dst->insert_many_data(sv->ptr, sv->len, size);
+ } else {
+ // TODO: the work may cause performance problem, opt latter
+ for (int i = 0; i < size; ++i) {
+ dst->insert_many_fix_len_data(static_cast<const
char*>(min_value->cell_ptr()), 1);
+ }
+ }
+ }
+
+ return Status::OK();
+}
+
bool ColumnReader::match_condition(const AndBlockColumnPredicate*
col_predicates) const {
if (_zone_map_index_meta == nullptr) {
return true;
@@ -710,6 +748,10 @@ Status FileColumnIterator::next_batch(size_t* n,
ColumnBlockView* dst, bool* has
return Status::OK();
}
+Status FileColumnIterator::next_batch_of_zone_map(size_t* n,
vectorized::MutableColumnPtr& dst) {
+ return _reader->next_batch_of_zone_map(n, dst);
+}
+
Status FileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr&
dst,
bool* has_null) {
size_t curr_size = dst->byte_size();
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h
b/be/src/olap/rowset/segment_v2/column_reader.h
index 33c7c418cd..d165dff067 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -125,6 +125,8 @@ public:
// Return true if segment zone map is absent or `cond' could be satisfied,
false otherwise.
bool match_condition(const AndBlockColumnPredicate* col_predicates) const;
+ Status next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr&
dst) const;
+
// get row ranges with zone map
// - cond_column is user's query predicate
// - delete_condition is a delete predicate of one version
@@ -256,6 +258,10 @@ public:
return Status::NotSupported("next_batch not implement");
}
+ virtual Status next_batch_of_zone_map(size_t* n,
vectorized::MutableColumnPtr& dst) {
+ return Status::NotSupported("next_batch_of_zone_map not implement");
+ }
+
virtual Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) {
return Status::NotSupported("read_by_rowids not implement");
@@ -299,6 +305,8 @@ public:
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool*
has_null) override;
+ Status next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr&
dst) override;
+
Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) override;
@@ -447,6 +455,10 @@ public:
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool*
has_null) override;
+ Status next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr&
dst) override {
+ return next_batch(n, dst);
+ }
+
Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) override;
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index 1f1cb59729..d5f585c3f3 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -35,6 +35,7 @@
#include "olap/tablet_schema.h"
#include "util/crc32c.h"
#include "util/slice.h" // Slice
+#include "vec/olap/vgeneric_iterators.h"
namespace doris {
namespace segment_v2 {
@@ -100,7 +101,12 @@ Status Segment::new_iterator(const Schema& schema, const
StorageReadOptions& rea
}
RETURN_IF_ERROR(load_index());
- iter->reset(new SegmentIterator(this->shared_from_this(), schema));
+ if (read_options.col_id_to_del_predicates.empty() &&
+ read_options.push_down_agg_type_opt != TPushAggOp::NONE) {
+
iter->reset(vectorized::new_vstatistics_iterator(this->shared_from_this(),
schema));
+ } else {
+ iter->reset(new SegmentIterator(this->shared_from_this(), schema));
+ }
iter->get()->init(read_options);
return Status::OK();
}
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 376545b067..98f2cfae27 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -22,7 +22,6 @@
#include <utility>
#include "common/status.h"
-#include "gutil/strings/substitute.h"
#include "olap/column_predicate.h"
#include "olap/olap_common.h"
#include "olap/row_block2.h"
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 2454167a33..d2820ff74b 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -145,8 +145,8 @@ private:
std::shared_ptr<Segment> _segment;
const Schema& _schema;
- // _column_iterators.size() == _schema.num_columns()
- // map<unique_id, ColumnIterator*>
_column_iterators/_bitmap_index_iterators;
+ // _column_iterators_map.size() == _schema.num_columns()
+ // map<unique_id, ColumnIterator*>
_column_iterators_map/_bitmap_index_iterators;
// can use _schema get unique_id by cid
std::map<int32_t, ColumnIterator*> _column_iterators;
std::map<int32_t, BitmapIndexIterator*> _bitmap_index_iterators;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index b85652abf0..5d88345cf5 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -147,12 +147,14 @@ Status NewOlapScanner::_init_tablet_reader_params(
->rowset()
->rowset_meta()
->is_segments_overlapping());
-
+ auto real_parent = reinterpret_cast<NewOlapScanNode*>(_parent);
if (_state->skip_storage_engine_merge()) {
_tablet_reader_params.direct_mode = true;
_aggregation = true;
} else {
- _tablet_reader_params.direct_mode = _aggregation || single_version;
+ _tablet_reader_params.direct_mode =
+ _aggregation || single_version ||
+ real_parent->_olap_scan_node.__isset.push_down_agg_type_opt;
}
RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode));
@@ -161,6 +163,9 @@ Status NewOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.tablet_schema = _tablet_schema;
_tablet_reader_params.reader_type = READER_QUERY;
_tablet_reader_params.aggregation = _aggregation;
+ if (real_parent->_olap_scan_node.__isset.push_down_agg_type_opt)
+ _tablet_reader_params.push_down_agg_type_opt =
+ real_parent->_olap_scan_node.push_down_agg_type_opt;
_tablet_reader_params.version = Version(0, _version);
// Condition
diff --git a/be/src/vec/exec/volap_scanner.cpp
b/be/src/vec/exec/volap_scanner.cpp
index 4157bdb89d..9258d48611 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -175,15 +175,18 @@ Status VOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.direct_mode = true;
_aggregation = true;
} else {
- _tablet_reader_params.direct_mode = _aggregation || single_version;
+ _tablet_reader_params.direct_mode = _aggregation || single_version ||
+
_parent->_olap_scan_node.__isset.push_down_agg_type_opt;
}
-
RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode));
_tablet_reader_params.tablet = _tablet;
_tablet_reader_params.tablet_schema = _tablet_schema;
_tablet_reader_params.reader_type = READER_QUERY;
_tablet_reader_params.aggregation = _aggregation;
+ if (_parent->_olap_scan_node.__isset.push_down_agg_type_opt)
+ _tablet_reader_params.push_down_agg_type_opt =
+ _parent->_olap_scan_node.push_down_agg_type_opt;
_tablet_reader_params.version = Version(0, _version);
// Condition
@@ -283,7 +286,6 @@ Status VOlapScanner::_init_return_columns(bool
need_seq_col) {
int32_t index = slot->col_unique_id() >= 0
?
_tablet_schema->field_index(slot->col_unique_id())
:
_tablet_schema->field_index(slot->col_name());
-
if (index < 0) {
std::stringstream ss;
ss << "field name is invalid. field=" << slot->col_name();
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 570315b431..f794d97ab5 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -49,6 +49,7 @@ Status BlockReader::_init_collect_iter(const ReaderParams&
read_params,
_reader_context.batch_size = _batch_size;
_reader_context.is_vec = true;
+ _reader_context.push_down_agg_type_opt =
read_params.push_down_agg_type_opt;
for (auto& rs_reader : rs_readers) {
RETURN_NOT_OK(rs_reader->init(&_reader_context));
Status res = _vcollect_iter.add_child(rs_reader);
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp
b/be/src/vec/olap/vgeneric_iterators.cpp
index 280d7b054a..e64668a838 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -21,6 +21,8 @@
#include "common/status.h"
#include "olap/iterators.h"
+#include "olap/rowset/segment_v2/column_reader.h"
+#include "olap/rowset/segment_v2/segment.h"
#include "olap/schema.h"
#include "vec/core/block.h"
@@ -111,6 +113,79 @@ Status VAutoIncrementIterator::init(const
StorageReadOptions& opts) {
return Status::OK();
}
+class VStatisticsIterator : public RowwiseIterator {
+public:
+ // Will generate num_rows rows in total
+ VStatisticsIterator(std::shared_ptr<Segment> segment, const Schema& schema)
+ : _segment(std::move(segment)), _schema(schema) {}
+
+ ~VStatisticsIterator() override {
+ for (auto& pair : _column_iterators_map) {
+ delete pair.second;
+ }
+ }
+
+ Status init(const StorageReadOptions& opts) override {
+ if (!_init) {
+ _push_down_agg_type_opt = opts.push_down_agg_type_opt;
+
+ for (size_t i = 0; i < _schema.num_column_ids(); i++) {
+ auto cid = _schema.column_id(i);
+ auto unique_id = _schema.column(cid)->unique_id();
+ if (_column_iterators_map.count(unique_id) < 1) {
+ RETURN_IF_ERROR(_segment->new_column_iterator(
+ opts.tablet_schema->column(cid),
&_column_iterators_map[unique_id]));
+ }
+ _column_iterators.push_back(_column_iterators_map[unique_id]);
+ }
+
+ _target_rows = _push_down_agg_type_opt == TPushAggOp::MINMAX ? 2 :
_segment->num_rows();
+ _init = true;
+ }
+
+ return Status::OK();
+ }
+
+ Status next_batch(Block* block) override {
+ DCHECK(block->columns() == _column_iterators.size());
+ if (_output_rows < _target_rows) {
+ block->clear_column_data();
+ auto columns = block->mutate_columns();
+
+ size_t size = _push_down_agg_type_opt == TPushAggOp::MINMAX
+ ? 2
+ : std::min(_target_rows - _output_rows,
MAX_ROW_SIZE_IN_COUNT);
+ if (_push_down_agg_type_opt == TPushAggOp::COUNT) {
+ size = std::min(_target_rows - _output_rows,
MAX_ROW_SIZE_IN_COUNT);
+ for (int i = 0; i < block->columns(); ++i) {
+ columns[i]->resize(size);
+ }
+ } else {
+ for (int i = 0; i < block->columns(); ++i) {
+ _column_iterators[i]->next_batch_of_zone_map(&size,
columns[i]);
+ }
+ }
+ _output_rows += size;
+ return Status::OK();
+ }
+ return Status::EndOfFile("End of VStatisticsIterator");
+ }
+
+ const Schema& schema() const override { return _schema; }
+
+private:
+ std::shared_ptr<Segment> _segment;
+ const Schema& _schema;
+ size_t _target_rows = 0;
+ size_t _output_rows = 0;
+ bool _init = false;
+ TPushAggOp::type _push_down_agg_type_opt;
+ std::map<int32_t, ColumnIterator*> _column_iterators_map;
+ std::vector<ColumnIterator*> _column_iterators;
+
+ static constexpr size_t MAX_ROW_SIZE_IN_COUNT = 65535;
+};
+
// Used to store merge state for a VMergeIterator input.
// This class will iterate all data from internal iterator
// through client call advance().
@@ -575,6 +650,10 @@ RowwiseIterator*
new_union_iterator(std::vector<RowwiseIterator*>& inputs) {
return new VUnionIterator(inputs);
}
+RowwiseIterator* new_vstatistics_iterator(std::shared_ptr<Segment> segment,
const Schema& schema) {
+ return new VStatisticsIterator(segment, schema);
+}
+
RowwiseIterator* new_auto_increment_iterator(const Schema& schema, size_t
num_rows) {
return new VAutoIncrementIterator(schema, num_rows);
}
diff --git a/be/src/vec/olap/vgeneric_iterators.h
b/be/src/vec/olap/vgeneric_iterators.h
index abab6d20fe..1342589355 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -19,6 +19,10 @@
namespace doris {
+namespace segment_v2 {
+class Segment;
+}
+
namespace vectorized {
// Create a merge iterator for input iterators. Merge iterator will merge
@@ -43,6 +47,8 @@ RowwiseIterator*
new_union_iterator(std::vector<RowwiseIterator*>& inputs);
// Client should delete returned iterator.
RowwiseIterator* new_auto_increment_iterator(const Schema& schema, size_t
num_rows);
+RowwiseIterator* new_vstatistics_iterator(std::shared_ptr<Segment> segment,
const Schema& schema);
+
} // namespace vectorized
} // namespace doris
diff --git a/be/test/vec/exec/vgeneric_iterators_test.cpp
b/be/test/vec/exec/vgeneric_iterators_test.cpp
index 6d8b307116..760a7059e4 100644
--- a/be/test/vec/exec/vgeneric_iterators_test.cpp
+++ b/be/test/vec/exec/vgeneric_iterators_test.cpp
@@ -1,4 +1,3 @@
-
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
index a063cfaba9..b2a3215247 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
@@ -1068,6 +1068,10 @@ public enum PrimitiveType {
return this == ARRAY;
}
+ public boolean isComplexType() {
+ return this == HLL || this == BITMAP;
+ }
+
public boolean isStringType() {
return (this == VARCHAR || this == CHAR || this == HLL || this ==
STRING);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 3e859101e7..eaf701e077 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -65,6 +65,7 @@ import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TPrimitiveType;
+import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
@@ -153,6 +154,8 @@ public class OlapScanNode extends ScanNode {
// It's limit for scanner instead of scanNode so we add a new limit.
private long sortLimit = -1;
+ private TPushAggOp pushDownAggNoGroupingOp = null;
+
// List of tablets will be scanned by current olap_scan_node
private ArrayList<Long> scanTabletIds = Lists.newArrayList();
@@ -175,6 +178,10 @@ public class OlapScanNode extends ScanNode {
this.reasonOfPreAggregation + " " +
reason;
}
+ public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) {
+ this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp;
+ }
+
public boolean isPreAggregation() {
return isPreAggregation;
}
@@ -926,6 +933,10 @@ public class OlapScanNode extends ScanNode {
msg.olap_scan_node.setKeyType(olapTable.getKeysType().toThrift());
msg.olap_scan_node.setTableName(olapTable.getName());
msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite());
+
+ if (pushDownAggNoGroupingOp != null) {
+ msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
+ }
}
// export some tablets
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 0a68abfc64..3c5b702947 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -56,8 +56,10 @@ import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.JdbcTable;
+import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
+import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
@@ -66,7 +68,9 @@ import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.planner.external.ExternalFileScanNode;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TNullSide;
+import org.apache.doris.thrift.TPushAggOp;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
@@ -362,6 +366,154 @@ public class SingleNodePlanner {
return selectNode;
}
+ private TPushAggOp freshTPushAggOpByName(String functionName, TPushAggOp
originAggOp) {
+ TPushAggOp newPushAggOp = null;
+ if (functionName.equalsIgnoreCase("COUNT")) {
+ newPushAggOp = TPushAggOp.COUNT;
+ } else {
+ newPushAggOp = TPushAggOp.MINMAX;
+ }
+
+ if (originAggOp == null || newPushAggOp == originAggOp) {
+ return newPushAggOp;
+ }
+ return TPushAggOp.MIX;
+ }
+
+ private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt
selectStmt, Analyzer analyzer, PlanNode root) {
+ do {
+ // TODO: Support other scan node in the future
+ if (!(root instanceof OlapScanNode)) {
+ break;
+ }
+
+ KeysType type = ((OlapScanNode) root).getOlapTable().getKeysType();
+ if (type == KeysType.UNIQUE_KEYS || type == KeysType.PRIMARY_KEYS)
{
+ break;
+ }
+
+ // TODO: Support muti table in the future
+ if (selectStmt.getTableRefs().size() != 1) {
+ break;
+ }
+
+ // No not support group by and where clause
+ if (null == aggInfo || !aggInfo.getGroupingExprs().isEmpty()) {
+ break;
+ }
+ List<Expr> allConjuncts =
analyzer.getAllConjuncts(selectStmt.getTableRefs().get(0).getId());
+ if (allConjuncts != null) {
+ break;
+ }
+
+ List<FunctionCallExpr> aggExprs = aggInfo.getAggregateExprs();
+ boolean aggExprValidate = true;
+ TPushAggOp aggOp = null;
+ for (FunctionCallExpr aggExpr : aggExprs) {
+ // Only support `min`, `max`, `count` and `count` only
effective in dup table
+ String functionName = aggExpr.getFnName().getFunction();
+ if (!functionName.equalsIgnoreCase("MAX")
+ && !functionName.equalsIgnoreCase("MIN")
+ && !functionName.equalsIgnoreCase("COUNT")) {
+ aggExprValidate = false;
+ break;
+ }
+
+ if (functionName.equalsIgnoreCase("COUNT")
+ && type != KeysType.DUP_KEYS) {
+ aggExprValidate = false;
+ break;
+ }
+
+ aggOp = freshTPushAggOpByName(functionName, aggOp);
+
+ if (aggExpr.getChildren().size() > 1) {
+ aggExprValidate = false;
+ break;
+ }
+
+ boolean returnColumnValidate = true;
+ if (aggExpr.getChildren().size() == 1) {
+ List<Column> returnColumns = Lists.newArrayList();
+ if (!(aggExpr.getChild(0) instanceof SlotRef)) {
+ Expr child = aggExpr.getChild(0);
+ if ((child instanceof CastExpr) && (child.getChild(0)
instanceof SlotRef)) {
+ if (child.getType().isNumericType()
+ &&
child.getChild(0).getType().isNumericType()) {
+ returnColumns.add(((SlotRef)
child.getChild(0)).getDesc().getColumn());
+ } else {
+ aggExprValidate = false;
+ break;
+ }
+ } else {
+ aggExprValidate = false;
+ break;
+ }
+ } else {
+ returnColumns.add(((SlotRef)
aggExpr.getChild(0)).getDesc().getColumn());
+ }
+
+
+ // check return columns
+ Column firstColumn = returnColumns.get(0);
+ for (Column col : returnColumns) {
+ // TODO(zc): Here column is null is too bad
+ // Only column of Inline-view will be null
+ if (col == null) {
+ continue;
+ }
+
+ if (type == KeysType.AGG_KEYS) {
+ if (!col.isKey() &&
!col.getName().equals(firstColumn.getName())) {
+ returnColumnValidate = false;
+ break;
+ }
+ }
+
+ // The zone map max length of CharFamily is 512, do not
+ // over the length:
https://github.com/apache/doris/pull/6293
+ if (aggOp == TPushAggOp.MINMAX || aggOp ==
TPushAggOp.MIX) {
+ PrimitiveType colType = col.getDataType();
+ if (colType.isArrayType() ||
colType.isComplexType()
+ || colType == PrimitiveType.STRING) {
+ returnColumnValidate = false;
+ break;
+ }
+
+ if (colType.isCharFamily() && aggOp !=
TPushAggOp.COUNT
+ && col.getType().getLength() > 512) {
+ returnColumnValidate = false;
+ break;
+ }
+ }
+
+ if (aggOp == TPushAggOp.COUNT || aggOp ==
TPushAggOp.MIX) {
+ // NULL value behavior in `count` function is
zero, so
+ // we should not use row_count to speed up query.
the col
+ // must be not null
+ if (col.isAllowNull()) {
+ returnColumnValidate = false;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!returnColumnValidate) {
+ aggExprValidate = false;
+ break;
+ }
+ }
+
+ if (!aggExprValidate) {
+ break;
+ }
+
+ OlapScanNode olapNode = (OlapScanNode) root;
+ olapNode.setPushDownAggNoGrouping(aggOp);
+ } while (false);
+ }
+
private void turnOffPreAgg(AggregateInfo aggInfo, SelectStmt selectStmt,
Analyzer analyzer, PlanNode root) {
String turnOffReason = null;
do {
@@ -998,6 +1150,10 @@ public class SingleNodePlanner {
materializeTableResultForCrossJoinOrCountStar(ref, analyzer);
PlanNode plan = createTableRefNode(analyzer, ref, selectStmt);
turnOffPreAgg(aggInfo, selectStmt, analyzer, plan);
+ if (VectorizedUtil.isVectorized()
+ &&
ConnectContext.get().getSessionVariable().enablePushDownNoGroupAgg) {
+ pushDownAggNoGrouping(aggInfo, selectStmt, analyzer, plan);
+ }
if (plan instanceof OlapScanNode) {
OlapScanNode olapNode = (OlapScanNode) plan;
@@ -1026,6 +1182,10 @@ public class SingleNodePlanner {
//
selectStmt.seondSubstituteInlineViewExprs(analyzer.getChangeResSmap());
turnOffPreAgg(aggInfo, selectStmt, analyzer, root);
+ if (VectorizedUtil.isVectorized()
+ &&
ConnectContext.get().getSessionVariable().enablePushDownNoGroupAgg) {
+ pushDownAggNoGrouping(aggInfo, selectStmt, analyzer, root);
+ }
if (root instanceof OlapScanNode) {
OlapScanNode olapNode = (OlapScanNode) root;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index e6752df67f..95fb643817 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -221,6 +221,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_NEW_SHUFFLE_HASH_METHOD =
"enable_new_shuffle_hash_method";
+ public static final String ENABLE_PUSH_DOWN_NO_GROUP_AGG =
"enable_push_down_no_group_agg";
+
// session origin value
public Map<Field, String> sessionOriginValue = new HashMap<Field,
String>();
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -563,7 +565,10 @@ public class SessionVariable implements Serializable,
Writable {
public boolean enableFallbackToOriginalPlanner = true;
@VariableMgr.VarAttr(name = ENABLE_NEW_SHUFFLE_HASH_METHOD)
- public boolean enableNewShffleHashMethod = true;
+ public boolean enableNewShuffleHashMethod = true;
+
+ @VariableMgr.VarAttr(name = ENABLE_PUSH_DOWN_NO_GROUP_AGG)
+ public boolean enablePushDownNoGroupAgg = true;
public String getBlockEncryptionMode() {
return blockEncryptionMode;
@@ -958,6 +963,10 @@ public class SessionVariable implements Serializable,
Writable {
this.enableVectorizedEngine = enableVectorizedEngine;
}
+ public boolean enablePushDownNoGroupAgg() {
+ return enablePushDownNoGroupAgg;
+ }
+
public boolean getEnableFunctionPushdown() {
return this.enableFunctionPushdown;
}
@@ -1170,7 +1179,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableFunctionPushdown(enableFunctionPushdown);
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
tResult.setEnableLocalExchange(enableLocalExchange);
- tResult.setEnableNewShuffleHashMethod(enableNewShffleHashMethod);
+ tResult.setEnableNewShuffleHashMethod(enableNewShuffleHashMethod);
tResult.setSkipStorageEngineMerge(skipStorageEngineMerge);
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 223a738df0..4135099c8b 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -463,6 +463,13 @@ struct TSortInfo {
4: optional list<Exprs.TExpr> sort_tuple_slot_exprs
}
+enum TPushAggOp {
+ NONE = 0,
+ MINMAX = 1,
+ COUNT = 2,
+ MIX = 3
+}
+
struct TOlapScanNode {
1: required Types.TTupleId tuple_id
2: required list<string> key_column_name
@@ -477,6 +484,7 @@ struct TOlapScanNode {
// It's limit for scanner instead of scanNode so we add a new limit.
10: optional i64 sort_limit
11: optional bool enable_unique_key_merge_on_write
+ 12: optional TPushAggOp push_down_agg_type_opt
}
struct TEqJoinCondition {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]