This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_syxj_2
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5dba701a82eccf62d5537972c9c05707724725b0
Author: daidai <[email protected]>
AuthorDate: Sat Jul 29 00:31:01 2023 +0800

    [opt](hive)opt select count(*) stmt push down agg on parquet in hive . 
(#22115)
    
    Optimization "select count(*) from table" stmtement , push down "count" 
type to BE.
    support file type : parquet ,orc in hive .
    
    1. 4kfiles , 60kwline num
        before:  1 min 37.70 sec
        after:   50.18 sec
    
    2. 50files , 60kwline num
        before: 1.12 sec
        after: 0.82 sec
---
 be/src/vec/exec/format/generic_reader.h            |   8 ++
 be/src/vec/exec/format/orc/vorc_reader.cpp         |  18 +++
 be/src/vec/exec/format/orc/vorc_reader.h           |   3 +
 .../exec/format/parquet/vparquet_group_reader.h    |   2 +
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  18 +++
 be/src/vec/exec/scan/new_olap_scan_node.cpp        |   8 +-
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  12 +-
 be/src/vec/exec/scan/scanner_scheduler.cpp         |   1 +
 be/src/vec/exec/scan/vfile_scanner.cpp             |   2 +
 be/src/vec/exec/scan/vscan_node.cpp                |  13 ++
 be/src/vec/exec/scan/vscan_node.h                  |   3 +
 .../rules/implementation/AggregateStrategies.java  |  88 +++++++++---
 .../org/apache/doris/planner/OlapScanNode.java     |  41 ++++--
 .../java/org/apache/doris/planner/PlanNode.java    |  17 +++
 .../apache/doris/planner/SingleNodePlanner.java    |  21 +--
 .../doris/planner/external/FileScanNode.java       |   2 +
 .../doris/planner/external/HiveScanNode.java       |  23 ++-
 gensrc/thrift/PlanNodes.thrift                     |   4 +-
 .../hive/test_select_count_optimize.out            | 157 +++++++++++++++++++++
 .../data/performance_p0/redundant_conjuncts.out    |   2 +
 .../hive/test_select_count_optimize.groovy         |  91 ++++++++++++
 21 files changed, 471 insertions(+), 63 deletions(-)

diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index 81ffdcf6fb..7b6f3c7b9c 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <gen_cpp/PlanNodes_types.h>
+
 #include "common/factory_creator.h"
 #include "common/status.h"
 #include "runtime/types.h"
@@ -30,7 +32,12 @@ class Block;
 // a set of blocks with specified schema,
 class GenericReader {
 public:
+    GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {}
+    void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) {
+        _push_down_agg_type = push_down_agg_type;
+    }
     virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) 
= 0;
+
     virtual std::unordered_map<std::string, TypeDescriptor> get_name_to_type() 
{
         std::unordered_map<std::string, TypeDescriptor> map;
         return map;
@@ -67,6 +74,7 @@ protected:
 
     /// Whether the underlying FileReader has filled the partition&missing 
columns
     bool _fill_all_columns = false;
+    TPushAggOp::type _push_down_agg_type;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index cdb3b28f4d..c93c7a7590 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -239,6 +239,8 @@ Status OrcReader::_create_file_reader() {
     } catch (std::exception& e) {
         return Status::InternalError("Init OrcReader failed. reason = {}", 
e.what());
     }
+    _remaining_rows = _reader->getNumberOfRows();
+
     return Status::OK();
 }
 
@@ -1373,6 +1375,22 @@ std::string OrcReader::_get_field_name_lower_case(const 
orc::Type* orc_type, int
 }
 
 Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    if (_push_down_agg_type == TPushAggOp::type::COUNT) {
+        auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size);
+
+        set_remaining_rows(get_remaining_rows() - rows);
+
+        for (auto& col : block->mutate_columns()) {
+            col->resize(rows);
+        }
+
+        *read_rows = rows;
+        if (get_remaining_rows() == 0) {
+            *eof = true;
+        }
+        return Status::OK();
+    }
+
     if (_lazy_read_ctx.can_lazy_read) {
         std::vector<uint32_t> columns_to_filter;
         int column_to_keep = block->columns();
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index 41d919578f..9b5c1fe576 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -487,8 +487,11 @@ private:
                                                            const NullMap* 
null_map,
                                                            
orc::ColumnVectorBatch* cvb,
                                                            const orc::Type* 
orc_column_typ);
+    int64_t get_remaining_rows() { return _remaining_rows; }
+    void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
 
 private:
+    int64_t _remaining_rows = 0;
     RuntimeProfile* _profile = nullptr;
     RuntimeState* _state = nullptr;
     const TFileScanRangeParams& _scan_params;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index ef35f5adf2..baa5912f99 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -158,6 +158,8 @@ public:
     int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; 
}
 
     ParquetColumnReader::Statistics statistics();
+    void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
+    int64_t get_remaining_rows() { return _remaining_rows; }
 
 private:
     void _merge_read_ranges(std::vector<RowRange>& row_ranges);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 38dc2a923a..ba80992a04 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -528,6 +528,24 @@ Status ParquetReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof)
         }
     }
     DCHECK(_current_group_reader != nullptr);
+    if (_push_down_agg_type == TPushAggOp::type::COUNT) {
+        auto rows = std::min(_current_group_reader->get_remaining_rows(), 
(int64_t)_batch_size);
+
+        
_current_group_reader->set_remaining_rows(_current_group_reader->get_remaining_rows()
 -
+                                                  rows);
+
+        for (auto& col : block->mutate_columns()) {
+            col->resize(rows);
+        }
+
+        *read_rows = rows;
+        if (_current_group_reader->get_remaining_rows() == 0) {
+            _current_group_reader.reset(nullptr);
+        }
+
+        return Status::OK();
+    }
+
     {
         SCOPED_RAW_TIMER(&_statistics.column_read_time);
         Status batch_st =
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index d53094b73c..3af5bb9f89 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -249,8 +249,7 @@ Status NewOlapScanNode::_process_conjuncts() {
 }
 
 Status NewOlapScanNode::_build_key_ranges_and_filters() {
-    if (!_olap_scan_node.__isset.push_down_agg_type_opt ||
-        _olap_scan_node.push_down_agg_type_opt == TPushAggOp::NONE) {
+    if (_push_down_agg_type == TPushAggOp::NONE) {
         const std::vector<std::string>& column_names = 
_olap_scan_node.key_column_name;
         const std::vector<TPrimitiveType::type>& column_types = 
_olap_scan_node.key_column_type;
         DCHECK(column_types.size() == column_names.size());
@@ -326,9 +325,8 @@ Status NewOlapScanNode::_build_key_ranges_and_filters() {
                        range);
         }
     } else {
-        _runtime_profile->add_info_string(
-                "PushDownAggregate",
-                
push_down_agg_to_string(_olap_scan_node.push_down_agg_type_opt));
+        _runtime_profile->add_info_string("PushDownAggregate",
+                                          
push_down_agg_to_string(_push_down_agg_type));
     }
 
     if (_state->enable_profile()) {
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 702d94c5f3..51864acb24 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -248,14 +248,13 @@ Status NewOlapScanner::_init_tablet_reader_params(
         const std::vector<FunctionFilter>& function_filters) {
     // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
     const bool single_version = _tablet_reader_params.has_single_version();
-    auto real_parent = reinterpret_cast<NewOlapScanNode*>(_parent);
+
     if (_state->skip_storage_engine_merge()) {
         _tablet_reader_params.direct_mode = true;
         _aggregation = true;
     } else {
-        _tablet_reader_params.direct_mode =
-                _aggregation || single_version ||
-                real_parent->_olap_scan_node.__isset.push_down_agg_type_opt;
+        _tablet_reader_params.direct_mode = _aggregation || single_version ||
+                                            (_parent->get_push_down_agg_type() 
!= TPushAggOp::NONE);
     }
 
     RETURN_IF_ERROR(_init_return_columns());
@@ -264,10 +263,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
     _tablet_reader_params.tablet_schema = _tablet_schema;
     _tablet_reader_params.reader_type = ReaderType::READER_QUERY;
     _tablet_reader_params.aggregation = _aggregation;
-    if (real_parent->_olap_scan_node.__isset.push_down_agg_type_opt) {
-        _tablet_reader_params.push_down_agg_type_opt =
-                real_parent->_olap_scan_node.push_down_agg_type_opt;
-    }
+    _tablet_reader_params.push_down_agg_type_opt = 
_parent->get_push_down_agg_type();
     _tablet_reader_params.version = Version(0, _version);
 
     // TODO: If a new runtime filter arrives after `_conjuncts` move to 
`_common_expr_ctxs_push_down`,
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 632e28e00a..3fa10c9d78 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -47,6 +47,7 @@
 #include "vec/core/block.h"
 #include "vec/exec/scan/new_olap_scanner.h" // IWYU pragma: keep
 #include "vec/exec/scan/scanner_context.h"
+#include "vec/exec/scan/vscan_node.h"
 #include "vec/exec/scan/vscanner.h"
 #include "vfile_scanner.h"
 
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 179da42d5f..e30d7640ed 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -245,6 +245,7 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, 
Block* block, bool* eo
         RETURN_IF_ERROR(_init_src_block(block));
         {
             SCOPED_TIMER(_get_block_timer);
+
             // Read next block.
             // Some of column in block may not be filled (column not exist in 
file)
             RETURN_IF_ERROR(
@@ -737,6 +738,7 @@ Status VFileScanner::_get_next_reader() {
         _name_to_col_type.clear();
         _missing_cols.clear();
         _cur_reader->get_columns(&_name_to_col_type, &_missing_cols);
+        _cur_reader->set_push_down_agg_type(_parent->get_push_down_agg_type());
         RETURN_IF_ERROR(_generate_fill_columns());
         if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
             fmt::memory_buffer col_buf;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index d9d191b6ab..7d7d3ef5dc 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -113,6 +113,19 @@ Status VScanNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     } else {
         _max_pushdown_conditions_per_column = 
config::max_pushdown_conditions_per_column;
     }
+
+    // tnode.olap_scan_node.push_down_agg_type_opt field is deprecated
+    // Introduced a new field : tnode.push_down_agg_type_opt
+    //
+    // make it compatible here
+    if (tnode.__isset.push_down_agg_type_opt) {
+        _push_down_agg_type = tnode.push_down_agg_type_opt;
+    } else if (tnode.olap_scan_node.__isset.push_down_agg_type_opt) {
+        _push_down_agg_type = tnode.olap_scan_node.push_down_agg_type_opt;
+
+    } else {
+        _push_down_agg_type = TPushAggOp::type::NONE;
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 83e8909280..822ae8c1d7 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -135,6 +135,7 @@ public:
         }
     }
 
+    TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
     // Get next block.
     // If eos is true, no more data will be read and block should be empty.
     Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
@@ -351,6 +352,8 @@ protected:
     std::unordered_map<std::string, int> _colname_to_slot_id;
     std::vector<int> _col_distribute_ids;
 
+    TPushAggOp::type _push_down_agg_type;
+
 private:
     Status _normalize_conjuncts();
     Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, 
VExprContext* context,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
index 6df6b2f817..3912315f4a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
@@ -55,8 +55,11 @@ import org.apache.doris.nereids.trees.plans.GroupPlan;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.algebra.Project;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import 
org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
@@ -115,6 +118,20 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
                     return storageLayerAggregate(agg, project, olapScan, 
ctx.cascadesContext);
                 })
             ),
+            RuleType.STORAGE_LAYER_AGGREGATE_WITH_PROJECT.build(
+                logicalAggregate(
+                    logicalProject(
+                        logicalFileScan()
+                    )
+                )
+                    .when(agg -> agg.isNormalized() && 
enablePushDownNoGroupAgg())
+                    .thenApply(ctx -> {
+                        LogicalAggregate<LogicalProject<LogicalFileScan>> agg 
= ctx.root;
+                        LogicalProject<LogicalFileScan> project = agg.child();
+                        LogicalFileScan fileScan = project.child();
+                        return storageLayerAggregate(agg, project, fileScan, 
ctx.cascadesContext);
+                    })
+            ),
             RuleType.ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT.build(
                 basePattern
                     .when(agg -> agg.getDistinctArguments().size() == 0)
@@ -190,14 +207,19 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
     private LogicalAggregate<? extends Plan> storageLayerAggregate(
             LogicalAggregate<? extends Plan> aggregate,
             @Nullable LogicalProject<? extends Plan> project,
-            LogicalOlapScan olapScan, CascadesContext cascadesContext) {
+            LogicalRelation logicalScan, CascadesContext cascadesContext) {
         final LogicalAggregate<? extends Plan> canNotPush = aggregate;
 
-        KeysType keysType = olapScan.getTable().getKeysType();
-        if (keysType != KeysType.AGG_KEYS && keysType != KeysType.DUP_KEYS) {
+        if (!(logicalScan instanceof LogicalOlapScan) && !(logicalScan 
instanceof LogicalFileScan)) {
             return canNotPush;
         }
 
+        if (logicalScan instanceof LogicalOlapScan) {
+            KeysType keysType = ((LogicalOlapScan) 
logicalScan).getTable().getKeysType();
+            if (keysType != KeysType.AGG_KEYS && keysType != 
KeysType.DUP_KEYS) {
+                return canNotPush;
+            }
+        }
         List<Expression> groupByExpressions = 
aggregate.getGroupByExpressions();
         if (!groupByExpressions.isEmpty() || 
!aggregate.getDistinctArguments().isEmpty()) {
             return canNotPush;
@@ -213,8 +235,11 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
         if (!supportedAgg.keySet().containsAll(functionClasses)) {
             return canNotPush;
         }
-        if (functionClasses.contains(Count.class) && keysType != 
KeysType.DUP_KEYS) {
-            return canNotPush;
+        if (logicalScan instanceof LogicalOlapScan) {
+            KeysType keysType = ((LogicalOlapScan) 
logicalScan).getTable().getKeysType();
+            if (functionClasses.contains(Count.class) && keysType != 
KeysType.DUP_KEYS) {
+                return canNotPush;
+            }
         }
         if (aggregateFunctions.stream().anyMatch(fun -> fun.arity() > 1)) {
             return canNotPush;
@@ -281,12 +306,15 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
                 ExpressionUtils.collect(argumentsOfAggregateFunction, 
SlotReference.class::isInstance);
 
         List<SlotReference> usedSlotInTable = (List<SlotReference>) (List) 
Project.findProject(aggUsedSlots,
-                (List<NamedExpression>) (List) olapScan.getOutput());
+                (List<NamedExpression>) (List) logicalScan.getOutput());
 
         for (SlotReference slot : usedSlotInTable) {
             Column column = slot.getColumn().get();
-            if (keysType == KeysType.AGG_KEYS && !column.isKey()) {
-                return canNotPush;
+            if (logicalScan instanceof LogicalOlapScan) {
+                KeysType keysType = ((LogicalOlapScan) 
logicalScan).getTable().getKeysType();
+                if (keysType == KeysType.AGG_KEYS && !column.isKey()) {
+                    return canNotPush;
+                }
             }
             // The zone map max length of CharFamily is 512, do not
             // over the length: https://github.com/apache/doris/pull/6293
@@ -310,19 +338,41 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
             }
         }
 
-        PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) new 
LogicalOlapScanToPhysicalOlapScan()
-                .build()
-                .transform(olapScan, cascadesContext)
-                .get(0);
-        if (project != null) {
-            return aggregate.withChildren(ImmutableList.of(
+        if (logicalScan instanceof LogicalOlapScan) {
+            PhysicalOlapScan physicalScan = (PhysicalOlapScan) new 
LogicalOlapScanToPhysicalOlapScan()
+                    .build()
+                    .transform((LogicalOlapScan) logicalScan, cascadesContext)
+                    .get(0);
+
+            if (project != null) {
+                return aggregate.withChildren(ImmutableList.of(
                     project.withChildren(
-                            ImmutableList.of(new 
PhysicalStorageLayerAggregate(physicalOlapScan, mergeOp)))
-            ));
+                        ImmutableList.of(new 
PhysicalStorageLayerAggregate(physicalScan, mergeOp)))
+                ));
+            } else {
+                return aggregate.withChildren(ImmutableList.of(
+                    new PhysicalStorageLayerAggregate(physicalScan, mergeOp)
+                ));
+            }
+
+        } else if (logicalScan instanceof LogicalFileScan) {
+            PhysicalFileScan physicalScan = (PhysicalFileScan) new 
LogicalFileScanToPhysicalFileScan()
+                    .build()
+                    .transform((LogicalFileScan) logicalScan, cascadesContext)
+                    .get(0);
+            if (project != null) {
+                return aggregate.withChildren(ImmutableList.of(
+                    project.withChildren(
+                        ImmutableList.of(new 
PhysicalStorageLayerAggregate(physicalScan, mergeOp)))
+                ));
+            } else {
+                return aggregate.withChildren(ImmutableList.of(
+                    new PhysicalStorageLayerAggregate(physicalScan, mergeOp)
+                ));
+            }
+
         } else {
-            return aggregate.withChildren(ImmutableList.of(
-                    new PhysicalStorageLayerAggregate(physicalOlapScan, 
mergeOp)
-            ));
+            return canNotPush;
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 6f10836a90..7b3e95c3a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.CastExpr;
 import org.apache.doris.analysis.CreateMaterializedViewStmt;
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.InPredicate;
 import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.PartitionNames;
@@ -77,7 +78,6 @@ import org.apache.doris.thrift.TPaloScanRange;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 import org.apache.doris.thrift.TPrimitiveType;
-import org.apache.doris.thrift.TPushAggOp;
 import org.apache.doris.thrift.TScanRange;
 import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
@@ -170,7 +170,6 @@ public class OlapScanNode extends ScanNode {
 
     private boolean useTopnOpt = false;
 
-    private TPushAggOp pushDownAggNoGroupingOp = null;
 
     // List of tablets will be scanned by current olap_scan_node
     private ArrayList<Long> scanTabletIds = Lists.newArrayList();
@@ -223,9 +222,6 @@ public class OlapScanNode extends ScanNode {
                                       this.reasonOfPreAggregation + " " + 
reason;
     }
 
-    public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) {
-        this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp;
-    }
 
     public boolean isPreAggregation() {
         return isPreAggregation;
@@ -1363,9 +1359,10 @@ public class OlapScanNode extends ScanNode {
         msg.olap_scan_node.setTableName(olapTable.getName());
         
msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite());
 
-        if (pushDownAggNoGroupingOp != null) {
-            msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
-        }
+        msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
+
+        msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
+        // In TOlapScanNode , pushDownAggNoGroupingOp field is deprecated.
 
         if (outputColumnUniqueIds != null) {
             msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds);
@@ -1580,4 +1577,32 @@ public class OlapScanNode extends ScanNode {
                 olapTable.getId(), selectedIndexId == -1 ? 
olapTable.getBaseIndexId() : selectedIndexId,
                 scanReplicaIds);
     }
+
+    @Override
+    public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
+        KeysType type = getOlapTable().getKeysType();
+        if (type == KeysType.UNIQUE_KEYS || type == KeysType.PRIMARY_KEYS) {
+            return false;
+        }
+
+        String aggFunctionName = aggExpr.getFnName().getFunction();
+        if (aggFunctionName.equalsIgnoreCase("COUNT") && type != 
KeysType.DUP_KEYS) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, 
Column col) {
+        KeysType type = getOlapTable().getKeysType();
+
+        // The value column of the agg does not support zone_map index.
+        if (type == KeysType.AGG_KEYS && !col.isKey()) {
+            return false;
+        }
+
+        return true;
+    }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index a279af676e..e11f674875 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -27,11 +27,13 @@ import org.apache.doris.analysis.CompoundPredicate;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ExprId;
 import org.apache.doris.analysis.ExprSubstitutionMap;
+import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.FunctionName;
 import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Function;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Type;
@@ -46,6 +48,7 @@ import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TFunctionBinaryType;
 import org.apache.doris.thrift.TPlan;
 import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPushAggOp;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -1175,4 +1178,18 @@ public abstract class PlanNode extends 
TreeNode<PlanNode> implements PlanStats {
     public void setCardinalityAfterFilter(long cardinalityAfterFilter) {
         this.cardinalityAfterFilter = cardinalityAfterFilter;
     }
+
+    protected TPushAggOp pushDownAggNoGroupingOp = TPushAggOp.NONE;
+
+    public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) {
+        this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp;
+    }
+
+    public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
+        return false;
+    }
+
+    public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, 
Column col) {
+        return false;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 8c4b4bb0f0..3bce97a0c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -56,7 +56,6 @@ import org.apache.doris.catalog.AggregateFunction;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.FunctionSet;
-import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MysqlTable;
 import org.apache.doris.catalog.OdbcTable;
 import org.apache.doris.catalog.PrimitiveType;
@@ -423,16 +422,6 @@ public class SingleNodePlanner {
 
     private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt 
selectStmt, Analyzer analyzer, PlanNode root) {
         do {
-            // TODO: Support other scan node in the future
-            if (!(root instanceof OlapScanNode)) {
-                break;
-            }
-
-            KeysType type = ((OlapScanNode) root).getOlapTable().getKeysType();
-            if (type == KeysType.UNIQUE_KEYS || type == KeysType.PRIMARY_KEYS) 
{
-                break;
-            }
-
             if (CollectionUtils.isNotEmpty(root.getConjuncts())) {
                 break;
             }
@@ -457,7 +446,6 @@ public class SingleNodePlanner {
             boolean aggExprValidate = true;
             TPushAggOp aggOp = null;
             for (FunctionCallExpr aggExpr : aggExprs) {
-                // Only support `min`, `max`, `count` and `count` only 
effective in dup table
                 String functionName = aggExpr.getFnName().getFunction();
                 if (!functionName.equalsIgnoreCase("MAX")
                         && !functionName.equalsIgnoreCase("MIN")
@@ -466,8 +454,7 @@ public class SingleNodePlanner {
                     break;
                 }
 
-                if (functionName.equalsIgnoreCase("COUNT")
-                        && type != KeysType.DUP_KEYS) {
+                if (!root.pushDownAggNoGrouping(aggExpr)) {
                     aggExprValidate = false;
                     break;
                 }
@@ -512,8 +499,7 @@ public class SingleNodePlanner {
                             continue;
                         }
 
-                        // The value column of the agg does not support 
zone_map index.
-                        if (type == KeysType.AGG_KEYS && !col.isKey()) {
+                        if (!root.pushDownAggNoGroupingCheckCol(aggExpr, col)) 
{
                             returnColumnValidate = false;
                             break;
                         }
@@ -556,8 +542,7 @@ public class SingleNodePlanner {
                 break;
             }
 
-            OlapScanNode olapNode = (OlapScanNode) root;
-            olapNode.setPushDownAggNoGrouping(aggOp);
+            root.setPushDownAggNoGrouping(aggOp);
         } while (false);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
index 5af4a5f123..38be399244 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
@@ -76,6 +76,8 @@ public abstract class FileScanNode extends ExternalScanNode {
 
     @Override
     protected void toThrift(TPlanNode planNode) {
+        planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
+
         planNode.setNodeType(TPlanNodeType.FILE_SCAN_NODE);
         TFileScanNode fileScanNode = new TFileScanNode();
         fileScanNode.setTupleId(desc.getId().asInt());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 6aa8c3185e..211f6e8056 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.planner.external;
 
+import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
@@ -67,13 +68,13 @@ public class HiveScanNode extends FileQueryScanNode {
 
     public static final String PROP_FIELD_DELIMITER = "field.delim";
     public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01"
-
     public static final String PROP_LINE_DELIMITER = "line.delim";
     public static final String DEFAULT_LINE_DELIMITER = "\n";
 
     public static final String PROP_ARRAY_DELIMITER_HIVE2 = "colelction.delim";
     public static final String PROP_ARRAY_DELIMITER_HIVE3 = "collection.delim";
     public static final String DEFAULT_ARRAY_DELIMITER = "\2";
+
     protected final HMSExternalTable hmsTable;
     private HiveTransaction hiveTransaction = null;
 
@@ -105,12 +106,11 @@ public class HiveScanNode extends FileQueryScanNode {
             for (SlotDescriptor slot : desc.getSlots()) {
                 if (slot.getType().isMapType() || 
slot.getType().isStructType()) {
                     throw new UserException("For column `" + 
slot.getColumn().getName()
-                            + "`, The column types MAP/STRUCT are not 
supported yet"
-                            + " for text input format of Hive. ");
+                        + "`, The column types MAP/STRUCT are not supported 
yet"
+                        + " for text input format of Hive. ");
                 }
             }
         }
-
         if (hmsTable.isHiveTransactionalTable()) {
             this.hiveTransaction = new 
HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()),
                     ConnectContext.get().getQualifiedUser(), hmsTable, 
hmsTable.isFullAcidTable());
@@ -310,4 +310,19 @@ public class HiveScanNode extends FileQueryScanNode {
         }
         params.setSlotNameToSchemaPos(columnNameToPosition);
     }
+
+    @Override
+    public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
+
+        String aggFunctionName = aggExpr.getFnName().getFunction();
+        if (aggFunctionName.equalsIgnoreCase("COUNT")) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, 
Column col) {
+        return !col.isAllowNull();
+    }
 }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 05f9f0972d..9318b593a3 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -636,7 +636,7 @@ struct TOlapScanNode {
   // It's limit for scanner instead of scanNode so we add a new limit.
   10: optional i64 sort_limit
   11: optional bool enable_unique_key_merge_on_write
-  12: optional TPushAggOp push_down_agg_type_opt
+  12: optional TPushAggOp push_down_agg_type_opt //Deprecated
   13: optional bool use_topn_opt
   14: optional list<Descriptors.TOlapTableIndex> indexes_desc
   15: optional set<i32> output_column_unique_ids
@@ -1142,6 +1142,8 @@ struct TPlanNode {
   46: optional TNestedLoopJoinNode nested_loop_join_node
   47: optional TTestExternalScanNode test_external_scan_node
 
+  48: optional TPushAggOp push_down_agg_type_opt
+  
   101: optional list<Exprs.TExpr> projections
   102: optional Types.TTupleId output_tuple_id
   103: optional TPartitionSortNode partition_sort_node
diff --git 
a/regression-test/data/external_table_emr_p2/hive/test_select_count_optimize.out
 
b/regression-test/data/external_table_emr_p2/hive/test_select_count_optimize.out
new file mode 100644
index 0000000000..9f76685f65
--- /dev/null
+++ 
b/regression-test/data/external_table_emr_p2/hive/test_select_count_optimize.out
@@ -0,0 +1,157 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+0      ALGERIA 0        haggle. carefully final deposits detect slyly agai
+1      ARGENTINA       1       al foxes promise slyly according to the regular 
accounts. bold requests alon
+2      BRAZIL  1       y alongside of the pending deposits. carefully special 
packages are about the ironic forges. slyly special 
+3      CANADA  1       eas hang ironic, silent packages. slyly regular 
packages are furiously over the tithes. fluffily bold
+18     CHINA   2       c dependencies. furiously express notornis sleep slyly 
regular accounts. ideas sleep. depos
+4      EGYPT   4       y above the carefully unusual theodolites. final 
dugouts are quickly across the furiously regular d
+5      ETHIOPIA        0       ven packages wake quickly. regu
+6      FRANCE  3       refully final requests. regular, ironi
+7      GERMANY 3       l platelets. regular accounts x-ray: unusual, regular 
acco
+8      INDIA   2       ss excuses cajole slyly across the packages. deposits 
print aroun
+9      INDONESIA       2        slyly express asymptotes. regular deposits 
haggle slyly. carefully ironic hockey players sleep blithely. carefull
+10     IRAN    4       efully alongside of the slyly final dependencies. 
+11     IRAQ    4       nic deposits boost atop the quickly final requests? 
quickly regula
+12     JAPAN   2       ously. final, express gifts cajole a
+13     JORDAN  4       ic deposits are blithely about the carefully regular pa
+14     KENYA   0        pending excuses haggle furiously deposits. pending, 
express pinto beans wake fluffily past t
+15     MOROCCO 0       rns. blithely bold courts among the closely regular 
packages use furiously bold platelets?
+16     MOZAMBIQUE      0       s. ironic, unusual asymptotes wake blithely r
+17     PERU    1       platelets. blithely pending dependencies use fluffily 
across the even pinto beans. carefully silent accoun
+19     ROMANIA 3       ular asymptotes are about the furious multipliers. 
express dependencies nag above the ironically ironic account
+22     RUSSIA  3        requests against the platelets use never according to 
the quickly regular pint
+20     SAUDI ARABIA    4       ts. silent requests haggle. closely express 
packages sleep across the blithely
+23     UNITED KINGDOM  3       eans boost carefully special requests. accounts 
are. carefull
+24     UNITED STATES   1       y final packages. slow foxes cajole quickly. 
quickly silent platelets breach ironic accounts. unusual pinto be
+21     VIETNAM 2       hely enticingly express accounts. even, final 
+
+-- !sql --
+25
+
+-- !sql --
+25
+
+-- !sql --
+0
+
+-- !sql --
+5
+
+-- !sql --
+4
+
+-- !sql --
+0
+
+-- !sql --
+5
+5
+5
+5
+5
+
+-- !sql --
+5999989709
+
+-- !sql --
+200000000
+
+-- !sql --
+200000000
+
+-- !sql --
+3995860
+3996114
+3997119
+3997177
+3997193
+3997623
+3998060
+3998197
+3998199
+3998205
+3998246
+3998259
+3998308
+3998860
+3998903
+3999137
+3999286
+3999411
+3999441
+3999477
+3999643
+3999670
+3999687
+3999830
+4000095
+4000151
+4000164
+4000268
+4000572
+4000594
+4000664
+4000672
+4000711
+4001091
+4001127
+4001273
+4001351
+4001463
+4001520
+4001568
+4001718
+4001940
+4001942
+4002064
+4002067
+4002305
+4002815
+4002966
+4003245
+4003749
+
+-- !sql --
+3999286
+
+-- !sql --
+210000000
+
+-- !sql --
+1
+
+-- !sql --
+200000000
+
+-- !sql --
+200000000
+
+-- !sql --
+3995860
+3996114
+3997119
+
+-- !sql --
+210000000
+
+-- !sql --
+ALGERIA
+ARGENTINA
+
+-- !sql --
+25
+
+-- !sql --
+0
+
+-- !sql --
+1
+
+-- !sql --
+5
+5
+5
+5
+5
+
diff --git a/regression-test/data/performance_p0/redundant_conjuncts.out 
b/regression-test/data/performance_p0/redundant_conjuncts.out
index 3baa5b3d93..f82e0c9453 100644
--- a/regression-test/data/performance_p0/redundant_conjuncts.out
+++ b/regression-test/data/performance_p0/redundant_conjuncts.out
@@ -12,6 +12,7 @@ PLAN FRAGMENT 0
      PREDICATES: `k1` = 1
      partitions=0/1, tablets=0/0, tabletList=
      cardinality=0, avgRowSize=8.0, numNodes=1
+     pushAggOp=NONE
 
 -- !redundant_conjuncts_gnerated_by_extract_common_filter --
 PLAN FRAGMENT 0
@@ -26,4 +27,5 @@ PLAN FRAGMENT 0
      PREDICATES: `k1` = 1 OR `k1` = 2
      partitions=0/1, tablets=0/0, tabletList=
      cardinality=0, avgRowSize=8.0, numNodes=1
+     pushAggOp=NONE
 
diff --git 
a/regression-test/suites/external_table_emr_p2/hive/test_select_count_optimize.groovy
 
b/regression-test/suites/external_table_emr_p2/hive/test_select_count_optimize.groovy
new file mode 100644
index 0000000000..2a95dc4294
--- /dev/null
+++ 
b/regression-test/suites/external_table_emr_p2/hive/test_select_count_optimize.groovy
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_select_count_optimize", "p2") {
+    String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String extHiveHmsHost = 
context.config.otherConfigs.get("extHiveHmsHost")
+        String extHiveHmsPort = 
context.config.otherConfigs.get("extHiveHmsPort")
+        String catalog_name = "test_select_count_optimize"
+        sql """drop catalog if exists ${catalog_name};"""
+        sql """
+            create catalog if not exists ${catalog_name} properties (
+                'type'='hms',
+                'hadoop.username' = 'hadoop',
+                'hive.metastore.uris' = 
'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
+            );
+        """
+        logger.info("catalog " + catalog_name + " created")
+        sql """switch ${catalog_name};"""
+        logger.info("switched to catalog " + catalog_name)
+        sql """ set query_timeout=3600; """ 
+
+        //parquet 
+        qt_sql """ select * from tpch_1000_parquet.nation order by 
n_name,n_regionkey,n_nationkey,n_comment ; """
+
+        qt_sql """ select count(*) from tpch_1000_parquet.nation; """
+
+        qt_sql """ select count(1024) from tpch_1000_parquet.nation; """
+        
+        qt_sql """ select count(null) from tpch_1000_parquet.nation; """
+
+
+        qt_sql """ select count(*) from tpch_1000_parquet.nation where 
n_regionkey = 0; """
+
+        qt_sql """ select max(n_regionkey) from  tpch_1000_parquet.nation ;""" 
+        
+        qt_sql """ select min(n_regionkey) from  tpch_1000_parquet.nation ; 
""" 
+
+        qt_sql """ select count(*) as a from tpch_1000_parquet.nation group by 
n_regionkey order by a; """ 
+        
+        qt_sql """ select count(*) from  tpch_1000_parquet.lineitem; """  
+
+        qt_sql """ select count(*) from  tpch_1000_parquet.part; """  
+        
+        qt_sql """ select count(p_partkey) from tpch_1000_parquet.part; """  
+
+        qt_sql """ select count(*) as sz from tpch_1000_parquet.part group by 
p_size order by  sz ;""" 
+
+        qt_sql """ select count(*) from tpch_1000_parquet.part where p_size = 
1; """ 
+
+        qt_sql """ select count(*) from   
user_profile.hive_hll_user_profile_wide_table_parquet; """;
+
+        //orc 
+        qt_sql """ select count(*) from   tpch_1000_orc.part where 
p_partkey=1; """ 
+        
+        qt_sql """ select max(p_partkey) from   tpch_1000_orc.part ; """ 
+
+        qt_sql """ select count(p_comment) from tpch_1000_orc.part; """ 
+
+        qt_sql """ select count(*) as a from tpch_1000_orc.part group by 
p_size order by a limit 3 ; """ 
+
+        qt_sql """ select count(*) from 
user_profile.hive_hll_user_profile_wide_table_orc ; """ ; 
+
+        //other 
+        qt_sql """ select n_name from tpch_1000.nation order by n_name limit 
2; """ 
+
+        qt_sql """ select count(*) from tpch_1000.nation; """ 
+        
+        qt_sql """ select min(n_regionkey) from  tpch_1000.nation ; """ 
+
+        qt_sql """ select count(*) from tpch_1000.nation where 
n_nationkey=5;"""
+
+        qt_sql """ select count(*) as a from tpch_1000.nation group by 
n_regionkey order by a;""" 
+        
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to