This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ae83a838d06 [nereids] config global partition topn (#31476) ae83a838d06 is described below commit ae83a838d06c6e4a7db03594cb968bf8e118199b Author: xzj7019 <131111794+xzj7...@users.noreply.github.com> AuthorDate: Wed Feb 28 18:47:11 2024 +0800 [nereids] config global partition topn (#31476) * [nereids] config global partition topn * [nereids] config global partition topn --------- Co-authored-by: zhongjian.xzj <zhongjian.xzj@zhongjianxzjdeMacBook-Pro.local> --- ...ogicalPartitionTopNToPhysicalPartitionTopN.java | 70 +++++++++++++++++++- .../java/org/apache/doris/qe/SessionVariable.java | 13 ++++ .../limit_push_down/limit_push_down.out | 2 +- .../push_filter_through_ptopn.out | 2 +- .../noStatsRfPrune/query67.out | 2 +- .../no_stats_shape/query67.out | 2 +- .../explain/test_global_partition_topn_plan.groovy | 76 ++++++++++++++++++++++ 7 files changed, 162 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java index 2b6fcfe464d..410e68f3ec2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java @@ -17,18 +17,26 @@ package org.apache.doris.nereids.rules.implementation; +import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.OrderExpression; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.PartitionTopnPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; /** * Implementation rule that convert logical partition-top-n to physical partition-top-n. @@ -42,7 +50,8 @@ public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementati private List<PhysicalPartitionTopN<? extends Plan>> generatePhysicalPartitionTopn( LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) { - if (logicalPartitionTopN.getPartitionKeys().isEmpty()) { + if (logicalPartitionTopN.getPartitionKeys().isEmpty() + || !checkTwoPhaseGlobalPartitionTopn(logicalPartitionTopN)) { // if no partition by keys, use local partition topn combined with further full sort List<OrderKey> orderKeys = !logicalPartitionTopN.getOrderKeys().isEmpty() ? logicalPartitionTopN.getOrderKeys().stream() @@ -99,6 +108,65 @@ public class LogicalPartitionTopNToPhysicalPartitionTopN extends OneImplementati } } + /** + * check if partition keys' ndv is almost near the total row count. + * if yes, it is not suitable for two phase global partition topn. + */ + private boolean checkTwoPhaseGlobalPartitionTopn(LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) { + double globalPartitionTopnThreshold = ConnectContext.get().getSessionVariable() + .getGlobalPartitionTopNThreshold(); + if (logicalPartitionTopN.getGroupExpression().isPresent()) { + Group group = logicalPartitionTopN.getGroupExpression().get().getOwnerGroup(); + if (group != null && group.getStatistics() != null) { + Statistics stats = group.getStatistics(); + double rowCount = stats.getRowCount(); + List<Expression> partitionKeys = logicalPartitionTopN.getPartitionKeys(); + if (!checkPartitionKeys(partitionKeys)) { + return false; + } + List<ColumnStatistic> partitionByKeyStats = partitionKeys.stream() + .map(partitionKey -> stats.findColumnStatistics(partitionKey)) + .filter(Objects::nonNull) + .filter(e -> !e.isUnKnown) + .collect(Collectors.toList()); + if (partitionByKeyStats.size() != partitionKeys.size()) { + return false; + } else { + List<Double> ndvs = partitionByKeyStats.stream().map(s -> s.ndv) + .filter(e -> e > 0 && !Double.isInfinite(e)) + .collect(Collectors.toList()); + if (ndvs.size() != partitionByKeyStats.size()) { + return false; + } else { + double maxNdv = ndvs.stream().max(Double::compare).get(); + return rowCount / maxNdv >= globalPartitionTopnThreshold; + } + } + } else { + return false; + } + } else { + return false; + } + } + + /** + * global partition topn only take effect if partition keys are columns from basic table + */ + private boolean checkPartitionKeys(List<Expression> partitionKeys) { + for (Expression expr : partitionKeys) { + if (!(expr instanceof SlotReference)) { + return false; + } else { + SlotReference slot = (SlotReference) expr; + if (!slot.getColumn().isPresent() || !slot.getTable().isPresent()) { + return false; + } + } + } + return true; + } + private ImmutableList<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) { ImmutableList.Builder<OrderKey> builder = ImmutableList.builder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 6c0fecf0d34..046d6d1a92e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -209,6 +209,8 @@ public class SessionVariable implements Serializable, Writable { public static final String MAX_JOIN_NUMBER_BUSHY_TREE = "max_join_number_bushy_tree"; public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn"; + public static final String GLOBAL_PARTITION_TOPN_THRESHOLD = "global_partition_topn_threshold"; + public static final String ENABLE_INFER_PREDICATE = "enable_infer_predicate"; public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000; @@ -1027,6 +1029,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PARTITION_TOPN) private boolean enablePartitionTopN = true; + @VariableMgr.VarAttr(name = GLOBAL_PARTITION_TOPN_THRESHOLD) + private double globalPartitionTopNThreshold = 100; + @VariableMgr.VarAttr(name = ENABLE_INFER_PREDICATE) private boolean enableInferPredicate = true; @@ -2533,6 +2538,14 @@ public class SessionVariable implements Serializable, Writable { this.enablePartitionTopN = enablePartitionTopN; } + public double getGlobalPartitionTopNThreshold() { + return globalPartitionTopNThreshold; + } + + public void setGlobalPartitionTopnThreshold(int threshold) { + this.globalPartitionTopNThreshold = threshold; + } + public boolean isEnableFoldNondeterministicFn() { return enableFoldNondeterministicFn; } diff --git a/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out b/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out index 31ffeee07dd..cbf8a09c7e6 100644 --- a/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out +++ b/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out @@ -389,7 +389,7 @@ PhysicalResultSink --PhysicalLimit[GLOBAL] ----PhysicalLimit[LOCAL] ------PhysicalWindow ---------PhysicalPartitionTopN +--------PhysicalQuickSort[LOCAL_SORT] ----------PhysicalPartitionTopN ------------hashAgg[GLOBAL] --------------hashAgg[LOCAL] diff --git a/regression-test/data/nereids_syntax_p0/push_filter_through_ptopn.out b/regression-test/data/nereids_syntax_p0/push_filter_through_ptopn.out index e37f73ef793..dd23fd8ea50 100644 --- a/regression-test/data/nereids_syntax_p0/push_filter_through_ptopn.out +++ b/regression-test/data/nereids_syntax_p0/push_filter_through_ptopn.out @@ -44,7 +44,7 @@ PhysicalResultSink ----PhysicalProject ------filter((T.b = 2) and (rn <= 2)) --------PhysicalWindow -----------PhysicalPartitionTopN +----------PhysicalQuickSort[LOCAL_SORT] ------------PhysicalDistribute[DistributionSpecHash] --------------PhysicalPartitionTopN ----------------PhysicalProject diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query67.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query67.out index bd3f6458a3c..246d2337283 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query67.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query67.out @@ -6,7 +6,7 @@ PhysicalResultSink ------PhysicalTopN[LOCAL_SORT] --------filter((rk <= 100)) ----------PhysicalWindow -------------PhysicalPartitionTopN +------------PhysicalQuickSort[LOCAL_SORT] --------------PhysicalDistribute[DistributionSpecHash] ----------------PhysicalPartitionTopN ------------------PhysicalProject diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query67.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query67.out index d069db662b3..900fe97ff05 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query67.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query67.out @@ -6,7 +6,7 @@ PhysicalResultSink ------PhysicalTopN[LOCAL_SORT] --------filter((rk <= 100)) ----------PhysicalWindow -------------PhysicalPartitionTopN +------------PhysicalQuickSort[LOCAL_SORT] --------------PhysicalDistribute[DistributionSpecHash] ----------------PhysicalPartitionTopN ------------------PhysicalProject diff --git a/regression-test/suites/nereids_p0/explain/test_global_partition_topn_plan.groovy b/regression-test/suites/nereids_p0/explain/test_global_partition_topn_plan.groovy new file mode 100644 index 00000000000..7141ac9fce2 --- /dev/null +++ b/regression-test/suites/nereids_p0/explain/test_global_partition_topn_plan.groovy @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_global_partition_topn_plan") { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + + sql "DROP TABLE IF EXISTS test_global_partition_topn_plan" + sql """ CREATE TABLE `test_global_partition_topn_plan` ( + c1 int, c2 int, c3 int + )ENGINE=OLAP + distributed by hash(c1) buckets 10 + properties( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ alter table test_global_partition_topn_plan modify column c1 set stats('row_count'='52899687', 'ndv'='52899687', 'num_nulls'='0', 'min_value'='1', 'max_value'='52899687', 'data_size'='4'); """ + sql """ alter table test_global_partition_topn_plan modify column c2 set stats('row_count'='52899687', 'ndv'='23622730', 'num_nulls'='0', 'min_value'='1', 'max_value'='52899687', 'data_size'='4'); """ + sql """ alter table test_global_partition_topn_plan modify column c3 set stats('row_count'='52899687', 'ndv'='2', 'num_nulls'='0', 'min_value'='0', 'max_value'='1', 'data_size'='4'); """ + + sql "SET global_partition_topn_threshold=2" + explain { + sql("shape plan select rn from (select row_number() over (partition by c2 order by c3) as rn from test_global_partition_topn_plan) tmp where rn <= 100"); + contains"PhysicalPartitionTopN" + notContains"PhysicalQuickSort" + } + + sql "SET global_partition_topn_threshold=3" + explain { + sql("shape plan select rn from (select row_number() over (partition by c2 order by c3) as rn from test_global_partition_topn_plan) tmp where rn <= 100"); + contains"PhysicalPartitionTopN" + contains"PhysicalQuickSort" + } + + sql "SET global_partition_topn_threshold=100" + explain { + sql("shape plan select rn from (select row_number() over (partition by c3 order by c2) as rn from test_global_partition_topn_plan) tmp where rn <= 100"); + contains"PhysicalPartitionTopN" + notContains"PhysicalQuickSort" + } + + sql "SET global_partition_topn_threshold=2" + explain { + sql("shape plan select rn from (select row_number() over (partition by c2, c3 order by c1) as rn from test_global_partition_topn_plan) tmp where rn <= 100"); + contains"PhysicalPartitionTopN" + notContains"PhysicalQuickSort" + } + + sql "SET global_partition_topn_threshold=3" + explain { + sql("shape plan select rn from (select row_number() over (partition by c2, c3 order by c1) as rn from test_global_partition_topn_plan) tmp where rn <= 100"); + contains"PhysicalPartitionTopN" + contains"PhysicalQuickSort" + } + + sql "SET global_partition_topn_threshold=2" + explain { + sql("shape plan select rn from (select row_number() over (partition by c2 + c3 order by c1) as rn from test_global_partition_topn_plan) tmp where rn <= 100"); + contains"PhysicalPartitionTopN" + contains"PhysicalQuickSort" + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org