morrySnow commented on code in PR #61495:
URL: https://github.com/apache/doris/pull/61495#discussion_r3031959768
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java:
##########
@@ -165,6 +183,285 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr
expr, Map<String, Strin
aggregate.getLogicalProperties(), localAgg));
}
+ /**
+ * Implements bucketed hash aggregation for single-BE deployments.
+ * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate
operator,
+ * eliminating exchange overhead and serialization/deserialization costs.
+ *
+ * Only generated when:
+ * 1. enable_bucketed_hash_agg session variable is true
+ * 2. Cluster has exactly one alive BE
+ * 3. Aggregate has GROUP BY keys (no without-key aggregation)
+ * 4. Aggregate functions support two-phase execution
+ * 5. Data volume checks pass (min input rows, max group keys)
+ */
+ private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan>
aggregate, ConnectContext ctx) {
+ if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+ return ImmutableList.of();
+ }
+ // Only for single-BE deployments
+ int beNumber = ctx.getEnv().getClusterInfo().getBackendsNumber(true);
+ if (beNumber != 1) {
+ return ImmutableList.of();
+ }
+ // Without-key aggregation not supported in initial version
+ if (aggregate.getGroupByExpressions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ // Must support two-phase execution (same check as splitTwoPhase)
+ if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+ return ImmutableList.of();
+ }
+ // Skip aggregates with no aggregate functions (pure GROUP BY dedup).
+ // These are produced by DistinctAggregateRewriter as the bottom dedup
phase.
+ if (aggregate.getAggregateFunctions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ // Skip aggregates containing multi-distinct functions (e.g.,
multi_distinct_count,
+ // multi_distinct_sum). These are semantically distinct aggregations
rewritten by
+ // DistinctAggregateRewriter — they embed deduplication in the
BE-level function.
+ // The bucketed agg cost model does not account for deduplication
overhead, which
+ // causes the base-table bucketed path to appear artificially cheap
compared to
+ // materialized views using pre-aggregated bitmap_union/hll_union.
+ for (AggregateFunction func : aggregate.getAggregateFunctions()) {
+ if (func instanceof MultiDistinction) {
+ return ImmutableList.of();
+ }
+ }
+ // Skip aggregates whose child group contains a LogicalAggregate.
+ // This detects the top aggregate in a DISTINCT decomposition (e.g.,
+ // COUNT(DISTINCT a) GROUP BY b is rewritten to COUNT(a) GROUP BY b
+ // on top of GROUP BY a,b dedup). Bucketed agg does not support
+ // DISTINCT aggregation in the initial version.
+ if (childGroupContainsAggregate(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Skip when sortByGroupKey optimization applies. This is detected by
+ // checking if the aggregate's owner group has a LogicalTopN parent
+ // whose order key expressions equal the group-by keys (produced by
+ // LimitAggToTopNAgg rewrite). PhysicalBucketedHashAggregate does not
+ // support sortByGroupKey, so we yield to the regular hash-agg plan.
+ if (hasSortByGroupKeyTopN(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Skip when data is already distributed by the GROUP BY keys
+ // (e.g., table bucketed by UserID, query GROUP BY UserID).
+ // In this case the two-phase plan needs no exchange and is strictly
+ // better than bucketed agg (no 256-bucket overhead, no merge phase).
+ if (groupByKeysSatisfyDistribution(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Data-volume-based checks: control bucketed agg eligibility based on
+ // estimated data scale, similar to ClickHouse's
group_by_two_level_threshold
+ // and group_by_two_level_threshold_bytes. This reduces reliance on
+ // column-level statistics which may be inaccurate or missing.
+ //
+ // When statistics are unavailable (groupExpression absent or
childStats null),
+ // conservatively skip bucketed agg — without data volume information
we cannot
+ // make an informed decision, and the risk of choosing bucketed agg in
a
+ // high-cardinality scenario outweighs the potential benefit.
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return ImmutableList.of();
+ }
+ GroupExpression ge = aggregate.getGroupExpression().get();
+ Statistics childStats = ge.childStatistics(0);
+ if (childStats == null) {
+ return ImmutableList.of();
+ }
+ double rows = childStats.getRowCount();
+ long minInputRows = ctx.getSessionVariable().bucketedAggMinInputRows;
+ long maxGroupKeys = ctx.getSessionVariable().bucketedAggMaxGroupKeys;
+
+ // Gate: minimum input rows.
+ // When input data is too small, the overhead of initializing 256
+ // per-bucket hash tables and the pipelined merge phase outweighs
+ // the benefit of eliminating exchange. Skip bucketed agg.
+ if (minInputRows > 0 && rows < minInputRows) {
+ return ImmutableList.of();
+ }
+
+ // Gate: maximum estimated group keys (similar to ClickHouse's
+ // group_by_two_level_threshold). When the number of distinct groups
+ // is too large, the source-side merge must combine too many keys
+ // across instances, and the merge cost dominates. Skip bucketed agg.
+ Statistics aggStats = ge.getOwnerGroup().getStatistics();
+ if (maxGroupKeys > 0 && aggStats != null && aggStats.getRowCount() >
maxGroupKeys) {
+ return ImmutableList.of();
+ }
+
+ // High-cardinality ratio checks (existing logic).
+ // These complement the absolute thresholds above with relative checks:
+ // 1. Single-column NDV check: if ANY GROUP BY key's NDV > rows *
threshold,
+ // the combined NDV is at least that high.
+ // 2. Aggregation ratio check: if estimated output rows > rows *
threshold,
+ // merge cost dominates.
+ double highCardThreshold = 0.3;
Review Comment:
why is this not a SessionVariable
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java:
##########
@@ -165,6 +183,285 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr
expr, Map<String, Strin
aggregate.getLogicalProperties(), localAgg));
}
+ /**
+ * Implements bucketed hash aggregation for single-BE deployments.
+ * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate
operator,
+ * eliminating exchange overhead and serialization/deserialization costs.
+ *
+ * Only generated when:
+ * 1. enable_bucketed_hash_agg session variable is true
+ * 2. Cluster has exactly one alive BE
+ * 3. Aggregate has GROUP BY keys (no without-key aggregation)
+ * 4. Aggregate functions support two-phase execution
+ * 5. Data volume checks pass (min input rows, max group keys)
+ */
+ private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan>
aggregate, ConnectContext ctx) {
+ if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+ return ImmutableList.of();
+ }
+ // Only for single-BE deployments
+ int beNumber = ctx.getEnv().getClusterInfo().getBackendsNumber(true);
+ if (beNumber != 1) {
+ return ImmutableList.of();
+ }
+ // Without-key aggregation not supported in initial version
+ if (aggregate.getGroupByExpressions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ // Must support two-phase execution (same check as splitTwoPhase)
+ if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+ return ImmutableList.of();
+ }
+ // Skip aggregates with no aggregate functions (pure GROUP BY dedup).
+ // These are produced by DistinctAggregateRewriter as the bottom dedup
phase.
+ if (aggregate.getAggregateFunctions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ // Skip aggregates containing multi-distinct functions (e.g.,
multi_distinct_count,
+ // multi_distinct_sum). These are semantically distinct aggregations
rewritten by
+ // DistinctAggregateRewriter — they embed deduplication in the
BE-level function.
+ // The bucketed agg cost model does not account for deduplication
overhead, which
+ // causes the base-table bucketed path to appear artificially cheap
compared to
+ // materialized views using pre-aggregated bitmap_union/hll_union.
+ for (AggregateFunction func : aggregate.getAggregateFunctions()) {
+ if (func instanceof MultiDistinction) {
+ return ImmutableList.of();
+ }
+ }
+ // Skip aggregates whose child group contains a LogicalAggregate.
+ // This detects the top aggregate in a DISTINCT decomposition (e.g.,
+ // COUNT(DISTINCT a) GROUP BY b is rewritten to COUNT(a) GROUP BY b
+ // on top of GROUP BY a,b dedup). Bucketed agg does not support
+ // DISTINCT aggregation in the initial version.
+ if (childGroupContainsAggregate(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Skip when sortByGroupKey optimization applies. This is detected by
+ // checking if the aggregate's owner group has a LogicalTopN parent
+ // whose order key expressions equal the group-by keys (produced by
+ // LimitAggToTopNAgg rewrite). PhysicalBucketedHashAggregate does not
+ // support sortByGroupKey, so we yield to the regular hash-agg plan.
+ if (hasSortByGroupKeyTopN(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Skip when data is already distributed by the GROUP BY keys
+ // (e.g., table bucketed by UserID, query GROUP BY UserID).
+ // In this case the two-phase plan needs no exchange and is strictly
+ // better than bucketed agg (no 256-bucket overhead, no merge phase).
+ if (groupByKeysSatisfyDistribution(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Data-volume-based checks: control bucketed agg eligibility based on
+ // estimated data scale, similar to ClickHouse's
group_by_two_level_threshold
+ // and group_by_two_level_threshold_bytes. This reduces reliance on
+ // column-level statistics which may be inaccurate or missing.
+ //
+ // When statistics are unavailable (groupExpression absent or
childStats null),
+ // conservatively skip bucketed agg — without data volume information
we cannot
+ // make an informed decision, and the risk of choosing bucketed agg in
a
+ // high-cardinality scenario outweighs the potential benefit.
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return ImmutableList.of();
+ }
+ GroupExpression ge = aggregate.getGroupExpression().get();
+ Statistics childStats = ge.childStatistics(0);
+ if (childStats == null) {
+ return ImmutableList.of();
+ }
+ double rows = childStats.getRowCount();
+ long minInputRows = ctx.getSessionVariable().bucketedAggMinInputRows;
+ long maxGroupKeys = ctx.getSessionVariable().bucketedAggMaxGroupKeys;
+
+ // Gate: minimum input rows.
+ // When input data is too small, the overhead of initializing 256
+ // per-bucket hash tables and the pipelined merge phase outweighs
+ // the benefit of eliminating exchange. Skip bucketed agg.
+ if (minInputRows > 0 && rows < minInputRows) {
+ return ImmutableList.of();
+ }
+
+ // Gate: maximum estimated group keys (similar to ClickHouse's
+ // group_by_two_level_threshold). When the number of distinct groups
+ // is too large, the source-side merge must combine too many keys
+ // across instances, and the merge cost dominates. Skip bucketed agg.
+ Statistics aggStats = ge.getOwnerGroup().getStatistics();
+ if (maxGroupKeys > 0 && aggStats != null && aggStats.getRowCount() >
maxGroupKeys) {
+ return ImmutableList.of();
+ }
+
+ // High-cardinality ratio checks (existing logic).
+ // These complement the absolute thresholds above with relative checks:
+ // 1. Single-column NDV check: if ANY GROUP BY key's NDV > rows *
threshold,
+ // the combined NDV is at least that high.
+ // 2. Aggregation ratio check: if estimated output rows > rows *
threshold,
+ // merge cost dominates.
+ double highCardThreshold = 0.3;
+ for (Expression groupByKey : aggregate.getGroupByExpressions()) {
+ ColumnStatistic colStat =
childStats.findColumnStatistics(groupByKey);
+ if (colStat != null && !colStat.isUnKnown() && colStat.ndv > rows
* highCardThreshold) {
+ return ImmutableList.of();
+ }
+ }
+ if (aggStats != null && aggStats.getRowCount() > rows *
highCardThreshold) {
+ return ImmutableList.of();
+ }
+ // Build output expressions: rewrite AggregateFunction ->
AggregateExpression with GLOBAL_RESULT param
+ // (same as one-phase aggregation — raw input directly produces final
result).
+ List<NamedExpression> aggOutput =
ExpressionUtils.rewriteDownShortCircuit(
+ aggregate.getOutputExpressions(), expr -> {
+ if (!(expr instanceof AggregateFunction)) {
+ return expr;
+ }
+ return new AggregateExpression((AggregateFunction) expr,
AggregateParam.GLOBAL_RESULT);
+ }
+ );
+ return ImmutableList.of(new PhysicalBucketedHashAggregate<>(
+ aggregate.getGroupByExpressions(), aggOutput,
+ aggregate.getLogicalProperties(), aggregate.child()));
+ }
+
+ /**
+ * Check if the child group of this aggregate contains a LogicalAggregate.
+ * This is used to detect aggregates produced by DISTINCT decomposition
rewrites
+ * (e.g., DistinctAggregateRewriter, SplitMultiDistinctStrategy), where
the original
+ * DISTINCT aggregate is split into a top non-distinct aggregate over a
bottom dedup aggregate.
+ */
+ private boolean childGroupContainsAggregate(LogicalAggregate<? extends
Plan> aggregate) {
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return false;
+ }
+ GroupExpression groupExpr = aggregate.getGroupExpression().get();
+ if (groupExpr.arity() == 0) {
+ return false;
+ }
+ Group childGroup = groupExpr.child(0);
+ for (GroupExpression childGroupExpr :
childGroup.getLogicalExpressions()) {
+ if (childGroupExpr.getPlan() instanceof LogicalAggregate) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Check if a LogicalTopN parent exists whose order keys are identical to
+ * the aggregate's group-by keys. This means PushTopnToAgg will later set
+ * sortByGroupKey on PhysicalHashAggregate; bucketed agg doesn't support
+ * that optimization so we skip it.
+ *
+ * Handles both TopN->Agg and TopN->Project->Agg patterns.
+ */
+ private boolean hasSortByGroupKeyTopN(LogicalAggregate<? extends Plan>
aggregate) {
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return false;
+ }
+ List<Expression> groupByKeys = aggregate.getGroupByExpressions();
+ Group ownerGroup =
aggregate.getGroupExpression().get().getOwnerGroup();
+ for (GroupExpression parentGE :
ownerGroup.getParentGroupExpressions()) {
+ Plan parentPlan = parentGE.getPlan();
+ if (parentPlan instanceof LogicalTopN
+ && orderKeysMatchGroupKeys((LogicalTopN<?>) parentPlan,
groupByKeys)) {
+ return true;
+ }
+ if (parentPlan instanceof LogicalProject &&
parentGE.getOwnerGroup() != null) {
+ for (GroupExpression gpGE :
parentGE.getOwnerGroup().getParentGroupExpressions()) {
+ if (gpGE.getPlan() instanceof LogicalTopN
+ && orderKeysMatchGroupKeys((LogicalTopN<?>)
gpGE.getPlan(), groupByKeys)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean orderKeysMatchGroupKeys(LogicalTopN<?> topN,
List<Expression> groupByKeys) {
Review Comment:
should reuse function in PushTopnToAgg
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java:
##########
@@ -165,6 +183,285 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr
expr, Map<String, Strin
aggregate.getLogicalProperties(), localAgg));
}
+ /**
+ * Implements bucketed hash aggregation for single-BE deployments.
+ * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate
operator,
+ * eliminating exchange overhead and serialization/deserialization costs.
+ *
+ * Only generated when:
+ * 1. enable_bucketed_hash_agg session variable is true
+ * 2. Cluster has exactly one alive BE
+ * 3. Aggregate has GROUP BY keys (no without-key aggregation)
+ * 4. Aggregate functions support two-phase execution
+ * 5. Data volume checks pass (min input rows, max group keys)
+ */
+ private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan>
aggregate, ConnectContext ctx) {
+ if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+ return ImmutableList.of();
+ }
+ // Only for single-BE deployments
+ int beNumber = ctx.getEnv().getClusterInfo().getBackendsNumber(true);
+ if (beNumber != 1) {
+ return ImmutableList.of();
+ }
+ // Without-key aggregation not supported in initial version
+ if (aggregate.getGroupByExpressions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ // Must support two-phase execution (same check as splitTwoPhase)
+ if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+ return ImmutableList.of();
+ }
+ // Skip aggregates with no aggregate functions (pure GROUP BY dedup).
+ // These are produced by DistinctAggregateRewriter as the bottom dedup
phase.
+ if (aggregate.getAggregateFunctions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ // Skip aggregates containing multi-distinct functions (e.g.,
multi_distinct_count,
+ // multi_distinct_sum). These are semantically distinct aggregations
rewritten by
+ // DistinctAggregateRewriter — they embed deduplication in the
BE-level function.
+ // The bucketed agg cost model does not account for deduplication
overhead, which
+ // causes the base-table bucketed path to appear artificially cheap
compared to
+ // materialized views using pre-aggregated bitmap_union/hll_union.
+ for (AggregateFunction func : aggregate.getAggregateFunctions()) {
+ if (func instanceof MultiDistinction) {
+ return ImmutableList.of();
+ }
+ }
+ // Skip aggregates whose child group contains a LogicalAggregate.
+ // This detects the top aggregate in a DISTINCT decomposition (e.g.,
+ // COUNT(DISTINCT a) GROUP BY b is rewritten to COUNT(a) GROUP BY b
+ // on top of GROUP BY a,b dedup). Bucketed agg does not support
+ // DISTINCT aggregation in the initial version.
+ if (childGroupContainsAggregate(aggregate)) {
+ return ImmutableList.of();
+ }
Review Comment:
a little confuse, even if the original aggregate is `DISTINCT`, when we come
here , all aggregate are not `DISTINCT`, so the comment let me confused.
##########
fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.ExprToThriftVisitor;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.thrift.TBucketedAggregationNode;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPlanNodeType;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Bucketed hash aggregation node.
+ *
+ * Fuses two-phase aggregation (local + global) into a single BE operator for
single-BE deployments.
+ * Produces a BUCKETED_AGGREGATION_NODE in the Thrift plan, which the BE maps
to
+ * BucketedAggSinkOperatorX / BucketedAggSourceOperatorX.
+ */
+public class BucketedAggregationNode extends PlanNode {
+ private final AggregateInfo aggInfo;
+ private final boolean needsFinalize;
+
+ public BucketedAggregationNode(PlanNodeId id, PlanNode input,
AggregateInfo aggInfo,
+ boolean needsFinalize) {
+ super(id, aggInfo.getOutputTupleId().asList(), "BUCKETED AGGREGATE");
+ this.aggInfo = aggInfo;
+ this.needsFinalize = needsFinalize;
+ this.children.add(input);
+ }
+
+ @Override
+ protected void toThrift(TPlanNode msg) {
+ msg.node_type = TPlanNodeType.BUCKETED_AGGREGATION_NODE;
+
+ List<TExpr> aggregateFunctions = Lists.newArrayList();
+ for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) {
+ aggregateFunctions.add(ExprToThriftVisitor.treeToThrift(e));
+ }
+
+ List<TExpr> groupingExprs = Lists.newArrayList();
+ if (aggInfo.getGroupingExprs() != null) {
+ groupingExprs =
ExprToThriftVisitor.treesToThrift(aggInfo.getGroupingExprs());
+ }
+
+ TBucketedAggregationNode bucketedAggNode = new
TBucketedAggregationNode();
+ bucketedAggNode.setGroupingExprs(groupingExprs);
+ bucketedAggNode.setAggregateFunctions(aggregateFunctions);
+
bucketedAggNode.setIntermediateTupleId(aggInfo.getOutputTupleId().asInt());
Review Comment:
if setIntermediateTupleId and setOutputTupleId always same, why not merge
them into one?
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java:
##########
@@ -165,6 +183,285 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr
expr, Map<String, Strin
aggregate.getLogicalProperties(), localAgg));
}
+ /**
+ * Implements bucketed hash aggregation for single-BE deployments.
+ * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate
operator,
+ * eliminating exchange overhead and serialization/deserialization costs.
+ *
+ * Only generated when:
+ * 1. enable_bucketed_hash_agg session variable is true
+ * 2. Cluster has exactly one alive BE
+ * 3. Aggregate has GROUP BY keys (no without-key aggregation)
+ * 4. Aggregate functions support two-phase execution
+ * 5. Data volume checks pass (min input rows, max group keys)
+ */
+ private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan>
aggregate, ConnectContext ctx) {
+ if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+ return ImmutableList.of();
+ }
+ // Only for single-BE deployments
+ int beNumber = ctx.getEnv().getClusterInfo().getBackendsNumber(true);
+ if (beNumber != 1) {
+ return ImmutableList.of();
+ }
+ // Without-key aggregation not supported in initial version
+ if (aggregate.getGroupByExpressions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ // Must support two-phase execution (same check as splitTwoPhase)
+ if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+ return ImmutableList.of();
+ }
+ // Skip aggregates with no aggregate functions (pure GROUP BY dedup).
+ // These are produced by DistinctAggregateRewriter as the bottom dedup
phase.
+ if (aggregate.getAggregateFunctions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ // Skip aggregates containing multi-distinct functions (e.g.,
multi_distinct_count,
+ // multi_distinct_sum). These are semantically distinct aggregations
rewritten by
+ // DistinctAggregateRewriter — they embed deduplication in the
BE-level function.
+ // The bucketed agg cost model does not account for deduplication
overhead, which
+ // causes the base-table bucketed path to appear artificially cheap
compared to
+ // materialized views using pre-aggregated bitmap_union/hll_union.
+ for (AggregateFunction func : aggregate.getAggregateFunctions()) {
+ if (func instanceof MultiDistinction) {
+ return ImmutableList.of();
+ }
+ }
+ // Skip aggregates whose child group contains a LogicalAggregate.
+ // This detects the top aggregate in a DISTINCT decomposition (e.g.,
+ // COUNT(DISTINCT a) GROUP BY b is rewritten to COUNT(a) GROUP BY b
+ // on top of GROUP BY a,b dedup). Bucketed agg does not support
+ // DISTINCT aggregation in the initial version.
+ if (childGroupContainsAggregate(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Skip when sortByGroupKey optimization applies. This is detected by
+ // checking if the aggregate's owner group has a LogicalTopN parent
+ // whose order key expressions equal the group-by keys (produced by
+ // LimitAggToTopNAgg rewrite). PhysicalBucketedHashAggregate does not
+ // support sortByGroupKey, so we yield to the regular hash-agg plan.
+ if (hasSortByGroupKeyTopN(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Skip when data is already distributed by the GROUP BY keys
+ // (e.g., table bucketed by UserID, query GROUP BY UserID).
+ // In this case the two-phase plan needs no exchange and is strictly
+ // better than bucketed agg (no 256-bucket overhead, no merge phase).
+ if (groupByKeysSatisfyDistribution(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Data-volume-based checks: control bucketed agg eligibility based on
+ // estimated data scale, similar to ClickHouse's
group_by_two_level_threshold
+ // and group_by_two_level_threshold_bytes. This reduces reliance on
+ // column-level statistics which may be inaccurate or missing.
+ //
+ // When statistics are unavailable (groupExpression absent or
childStats null),
+ // conservatively skip bucketed agg — without data volume information
we cannot
+ // make an informed decision, and the risk of choosing bucketed agg in
a
+ // high-cardinality scenario outweighs the potential benefit.
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return ImmutableList.of();
+ }
+ GroupExpression ge = aggregate.getGroupExpression().get();
+ Statistics childStats = ge.childStatistics(0);
+ if (childStats == null) {
+ return ImmutableList.of();
+ }
+ double rows = childStats.getRowCount();
+ long minInputRows = ctx.getSessionVariable().bucketedAggMinInputRows;
+ long maxGroupKeys = ctx.getSessionVariable().bucketedAggMaxGroupKeys;
+
+ // Gate: minimum input rows.
+ // When input data is too small, the overhead of initializing 256
+ // per-bucket hash tables and the pipelined merge phase outweighs
+ // the benefit of eliminating exchange. Skip bucketed agg.
+ if (minInputRows > 0 && rows < minInputRows) {
+ return ImmutableList.of();
+ }
+
+ // Gate: maximum estimated group keys (similar to ClickHouse's
+ // group_by_two_level_threshold). When the number of distinct groups
+ // is too large, the source-side merge must combine too many keys
+ // across instances, and the merge cost dominates. Skip bucketed agg.
+ Statistics aggStats = ge.getOwnerGroup().getStatistics();
+ if (maxGroupKeys > 0 && aggStats != null && aggStats.getRowCount() >
maxGroupKeys) {
+ return ImmutableList.of();
+ }
+
+ // High-cardinality ratio checks (existing logic).
+ // These complement the absolute thresholds above with relative checks:
+ // 1. Single-column NDV check: if ANY GROUP BY key's NDV > rows *
threshold,
+ // the combined NDV is at least that high.
+ // 2. Aggregation ratio check: if estimated output rows > rows *
threshold,
+ // merge cost dominates.
+ double highCardThreshold = 0.3;
+ for (Expression groupByKey : aggregate.getGroupByExpressions()) {
+ ColumnStatistic colStat =
childStats.findColumnStatistics(groupByKey);
+ if (colStat != null && !colStat.isUnKnown() && colStat.ndv > rows
* highCardThreshold) {
+ return ImmutableList.of();
+ }
+ }
+ if (aggStats != null && aggStats.getRowCount() > rows *
highCardThreshold) {
+ return ImmutableList.of();
+ }
+ // Build output expressions: rewrite AggregateFunction ->
AggregateExpression with GLOBAL_RESULT param
+ // (same as one-phase aggregation — raw input directly produces final
result).
+ List<NamedExpression> aggOutput =
ExpressionUtils.rewriteDownShortCircuit(
+ aggregate.getOutputExpressions(), expr -> {
+ if (!(expr instanceof AggregateFunction)) {
+ return expr;
+ }
+ return new AggregateExpression((AggregateFunction) expr,
AggregateParam.GLOBAL_RESULT);
+ }
+ );
+ return ImmutableList.of(new PhysicalBucketedHashAggregate<>(
+ aggregate.getGroupByExpressions(), aggOutput,
+ aggregate.getLogicalProperties(), aggregate.child()));
+ }
+
+ /**
+ * Check if the child group of this aggregate contains a LogicalAggregate.
+ * This is used to detect aggregates produced by DISTINCT decomposition
rewrites
+ * (e.g., DistinctAggregateRewriter, SplitMultiDistinctStrategy), where
the original
+ * DISTINCT aggregate is split into a top non-distinct aggregate over a
bottom dedup aggregate.
+ */
+ private boolean childGroupContainsAggregate(LogicalAggregate<? extends
Plan> aggregate) {
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return false;
+ }
+ GroupExpression groupExpr = aggregate.getGroupExpression().get();
+ if (groupExpr.arity() == 0) {
+ return false;
+ }
+ Group childGroup = groupExpr.child(0);
+ for (GroupExpression childGroupExpr :
childGroup.getLogicalExpressions()) {
+ if (childGroupExpr.getPlan() instanceof LogicalAggregate) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Check if a LogicalTopN parent exists whose order keys are identical to
+ * the aggregate's group-by keys. This means PushTopnToAgg will later set
+ * sortByGroupKey on PhysicalHashAggregate; bucketed agg doesn't support
+ * that optimization so we skip it.
+ *
+ * Handles both TopN->Agg and TopN->Project->Agg patterns.
+ */
+ private boolean hasSortByGroupKeyTopN(LogicalAggregate<? extends Plan>
aggregate) {
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return false;
+ }
+ List<Expression> groupByKeys = aggregate.getGroupByExpressions();
+ Group ownerGroup =
aggregate.getGroupExpression().get().getOwnerGroup();
+ for (GroupExpression parentGE :
ownerGroup.getParentGroupExpressions()) {
+ Plan parentPlan = parentGE.getPlan();
+ if (parentPlan instanceof LogicalTopN
+ && orderKeysMatchGroupKeys((LogicalTopN<?>) parentPlan,
groupByKeys)) {
+ return true;
+ }
+ if (parentPlan instanceof LogicalProject &&
parentGE.getOwnerGroup() != null) {
+ for (GroupExpression gpGE :
parentGE.getOwnerGroup().getParentGroupExpressions()) {
+ if (gpGE.getPlan() instanceof LogicalTopN
+ && orderKeysMatchGroupKeys((LogicalTopN<?>)
gpGE.getPlan(), groupByKeys)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean orderKeysMatchGroupKeys(LogicalTopN<?> topN,
List<Expression> groupByKeys) {
+ List<OrderKey> orderKeys = topN.getOrderKeys();
+ if (orderKeys.size() != groupByKeys.size()) {
+ return false;
+ }
+ for (int i = 0; i < groupByKeys.size(); i++) {
+ if (!groupByKeys.get(i).equals(orderKeys.get(i).getExpr())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Check if the GROUP BY keys of this aggregate are a superset of (or
equal to)
+ * the underlying OlapTable's hash distribution columns. When this is true,
+ * the data is already correctly partitioned for the aggregation, so the
+ * two-phase plan (local + global) requires no exchange and is strictly
better
+ * than bucketed agg (no 256-bucket overhead, no merge phase).
+ *
+ * Traverses the child group in the Memo to find a LogicalOlapScan,
+ * walking through LogicalProject and LogicalFilter transparently.
+ */
+ private boolean groupByKeysSatisfyDistribution(LogicalAggregate<? extends
Plan> aggregate) {
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return false;
+ }
+ GroupExpression groupExpr = aggregate.getGroupExpression().get();
+ if (groupExpr.arity() == 0) {
+ return false;
+ }
+ OlapTable table = findOlapTableInGroup(groupExpr.child(0), 5);
+ if (table == null) {
+ return false;
+ }
+ DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
Review Comment:
should use
`org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan#getDistributionSpec`
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java:
##########
@@ -504,6 +505,14 @@ public Void
visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan> agg
return null;
}
+ @Override
+ public Void visitPhysicalBucketedHashAggregate(
Review Comment:
add ut
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java:
##########
@@ -504,6 +505,14 @@ public Void
visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan> agg
return null;
}
+ @Override
+ public Void visitPhysicalBucketedHashAggregate(
Review Comment:
add ut
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java:
##########
@@ -165,6 +183,285 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr
expr, Map<String, Strin
aggregate.getLogicalProperties(), localAgg));
}
+ /**
+ * Implements bucketed hash aggregation for single-BE deployments.
+ * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate
operator,
+ * eliminating exchange overhead and serialization/deserialization costs.
+ *
+ * Only generated when:
+ * 1. enable_bucketed_hash_agg session variable is true
+ * 2. Cluster has exactly one alive BE
+ * 3. Aggregate has GROUP BY keys (no without-key aggregation)
+ * 4. Aggregate functions support two-phase execution
+ * 5. Data volume checks pass (min input rows, max group keys)
+ */
+ private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan>
aggregate, ConnectContext ctx) {
Review Comment:
add ut
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Physical bucketed hash aggregation plan node.
+ *
+ * Fuses two-phase aggregation (local + global) into a single operator for
single-BE deployments.
+ * The sink side builds per-instance hash tables from raw input (first-phase
agg).
+ * The source side merges across instances per-bucket using direct in-memory
merge
+ * (no serialization/deserialization) and outputs the final result.
+ *
+ * This node replaces the pattern: GlobalAgg -> PhysicalDistribute -> LocalAgg
+ * with a single fused operator, eliminating exchange overhead entirely.
+ */
+public class PhysicalBucketedHashAggregate<CHILD_TYPE extends Plan> extends
PhysicalUnary<CHILD_TYPE>
+ implements Aggregate<CHILD_TYPE> {
+
+ private final List<Expression> groupByExpressions;
+ private final List<NamedExpression> outputExpressions;
+
+ public PhysicalBucketedHashAggregate(List<Expression> groupByExpressions,
+ List<NamedExpression> outputExpressions,
+ LogicalProperties logicalProperties, CHILD_TYPE child) {
+ this(groupByExpressions, outputExpressions, Optional.empty(),
logicalProperties, child);
+ }
+
+ public PhysicalBucketedHashAggregate(List<Expression> groupByExpressions,
+ List<NamedExpression> outputExpressions,
+ Optional<GroupExpression> groupExpression,
+ LogicalProperties logicalProperties, CHILD_TYPE child) {
+ super(PlanType.PHYSICAL_BUCKETED_HASH_AGGREGATE, groupExpression,
logicalProperties, child);
+ this.groupByExpressions = ImmutableList.copyOf(
+ Objects.requireNonNull(groupByExpressions, "groupByExpressions
cannot be null"));
+ this.outputExpressions = ImmutableList.copyOf(
+ Objects.requireNonNull(outputExpressions, "outputExpressions
cannot be null"));
+ }
+
+ /**
+ * Constructor with group expression, logical properties, physical
properties, and statistics.
+ */
+ public PhysicalBucketedHashAggregate(List<Expression> groupByExpressions,
+ List<NamedExpression> outputExpressions,
+ Optional<GroupExpression> groupExpression,
+ LogicalProperties logicalProperties,
+ PhysicalProperties physicalProperties, Statistics statistics,
CHILD_TYPE child) {
+ super(PlanType.PHYSICAL_BUCKETED_HASH_AGGREGATE, groupExpression,
logicalProperties,
+ physicalProperties, statistics, child);
+ this.groupByExpressions = ImmutableList.copyOf(
+ Objects.requireNonNull(groupByExpressions, "groupByExpressions
cannot be null"));
+ this.outputExpressions = ImmutableList.copyOf(
+ Objects.requireNonNull(outputExpressions, "outputExpressions
cannot be null"));
+ }
+
+ @Override
+ public List<Expression> getGroupByExpressions() {
+ return groupByExpressions;
+ }
+
+ @Override
+ public List<NamedExpression> getOutputExpressions() {
+ return outputExpressions;
+ }
+
+ @Override
+ public List<NamedExpression> getOutputs() {
+ return outputExpressions;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPhysicalBucketedHashAggregate(this, context);
+ }
+
+ @Override
+ public List<? extends Expression> getExpressions() {
+ return new ImmutableList.Builder<Expression>()
+ .addAll(groupByExpressions)
+ .addAll(outputExpressions)
+ .build();
+ }
+
+ @Override
+ public String toString() {
+ return Utils.toSqlString("PhysicalBucketedHashAggregate[" + id.asInt()
+ "]" + getGroupIdWithPrefix(),
+ "stats", statistics,
+ "groupByExpr", groupByExpressions,
+ "outputExpr", outputExpressions
+ );
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PhysicalBucketedHashAggregate<?> that =
(PhysicalBucketedHashAggregate<?>) o;
+ return Objects.equals(groupByExpressions, that.groupByExpressions)
+ && Objects.equals(outputExpressions, that.outputExpressions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupByExpressions, outputExpressions);
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<Plan> withChildren(List<Plan>
children) {
+ Preconditions.checkArgument(children.size() == 1);
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, getLogicalProperties(),
+ physicalProperties, statistics, children.get(0));
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<CHILD_TYPE> withGroupExpression(
+ Optional<GroupExpression> groupExpression) {
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, getLogicalProperties(), child());
+ }
+
+ @Override
+ public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
+ Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
+ Preconditions.checkArgument(children.size() == 1);
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, logicalProperties.get(), children.get(0));
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<CHILD_TYPE>
withPhysicalPropertiesAndStats(
+ PhysicalProperties physicalProperties, Statistics statistics) {
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, getLogicalProperties(),
+ physicalProperties, statistics, child());
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<CHILD_TYPE>
withAggOutput(List<NamedExpression> newOutput) {
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
newOutput,
+ Optional.empty(), getLogicalProperties(),
+ physicalProperties, statistics, child());
+ }
+
+ @Override
+ public String shapeInfo() {
+ StringBuilder builder = new StringBuilder("bucketedHashAgg[");
+ builder.append("BUCKETED");
+ builder.append(']');
Review Comment:
bucketedHashAgg is enough
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Physical bucketed hash aggregation plan node.
+ *
+ * Fuses two-phase aggregation (local + global) into a single operator for
single-BE deployments.
+ * The sink side builds per-instance hash tables from raw input (first-phase
agg).
+ * The source side merges across instances per-bucket using direct in-memory
merge
+ * (no serialization/deserialization) and outputs the final result.
+ *
+ * This node replaces the pattern: GlobalAgg -> PhysicalDistribute -> LocalAgg
+ * with a single fused operator, eliminating exchange overhead entirely.
+ */
+public class PhysicalBucketedHashAggregate<CHILD_TYPE extends Plan> extends
PhysicalUnary<CHILD_TYPE>
+ implements Aggregate<CHILD_TYPE> {
+
+ private final List<Expression> groupByExpressions;
+ private final List<NamedExpression> outputExpressions;
+
+ public PhysicalBucketedHashAggregate(List<Expression> groupByExpressions,
+ List<NamedExpression> outputExpressions,
+ LogicalProperties logicalProperties, CHILD_TYPE child) {
+ this(groupByExpressions, outputExpressions, Optional.empty(),
logicalProperties, child);
+ }
+
+ public PhysicalBucketedHashAggregate(List<Expression> groupByExpressions,
+ List<NamedExpression> outputExpressions,
+ Optional<GroupExpression> groupExpression,
+ LogicalProperties logicalProperties, CHILD_TYPE child) {
+ super(PlanType.PHYSICAL_BUCKETED_HASH_AGGREGATE, groupExpression,
logicalProperties, child);
+ this.groupByExpressions = ImmutableList.copyOf(
+ Objects.requireNonNull(groupByExpressions, "groupByExpressions
cannot be null"));
+ this.outputExpressions = ImmutableList.copyOf(
+ Objects.requireNonNull(outputExpressions, "outputExpressions
cannot be null"));
+ }
+
+ /**
+ * Constructor with group expression, logical properties, physical
properties, and statistics.
+ */
+ public PhysicalBucketedHashAggregate(List<Expression> groupByExpressions,
+ List<NamedExpression> outputExpressions,
+ Optional<GroupExpression> groupExpression,
+ LogicalProperties logicalProperties,
+ PhysicalProperties physicalProperties, Statistics statistics,
CHILD_TYPE child) {
+ super(PlanType.PHYSICAL_BUCKETED_HASH_AGGREGATE, groupExpression,
logicalProperties,
+ physicalProperties, statistics, child);
+ this.groupByExpressions = ImmutableList.copyOf(
+ Objects.requireNonNull(groupByExpressions, "groupByExpressions
cannot be null"));
+ this.outputExpressions = ImmutableList.copyOf(
+ Objects.requireNonNull(outputExpressions, "outputExpressions
cannot be null"));
+ }
+
+ @Override
+ public List<Expression> getGroupByExpressions() {
+ return groupByExpressions;
+ }
+
+ @Override
+ public List<NamedExpression> getOutputExpressions() {
+ return outputExpressions;
+ }
+
+ @Override
+ public List<NamedExpression> getOutputs() {
+ return outputExpressions;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPhysicalBucketedHashAggregate(this, context);
+ }
+
+ @Override
+ public List<? extends Expression> getExpressions() {
+ return new ImmutableList.Builder<Expression>()
+ .addAll(groupByExpressions)
+ .addAll(outputExpressions)
+ .build();
+ }
+
+ @Override
+ public String toString() {
+ return Utils.toSqlString("PhysicalBucketedHashAggregate[" + id.asInt()
+ "]" + getGroupIdWithPrefix(),
+ "stats", statistics,
+ "groupByExpr", groupByExpressions,
+ "outputExpr", outputExpressions
+ );
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PhysicalBucketedHashAggregate<?> that =
(PhysicalBucketedHashAggregate<?>) o;
+ return Objects.equals(groupByExpressions, that.groupByExpressions)
+ && Objects.equals(outputExpressions, that.outputExpressions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupByExpressions, outputExpressions);
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<Plan> withChildren(List<Plan>
children) {
+ Preconditions.checkArgument(children.size() == 1);
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, getLogicalProperties(),
+ physicalProperties, statistics, children.get(0));
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<CHILD_TYPE> withGroupExpression(
+ Optional<GroupExpression> groupExpression) {
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, getLogicalProperties(), child());
+ }
+
+ @Override
+ public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
+ Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
+ Preconditions.checkArgument(children.size() == 1);
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, logicalProperties.get(), children.get(0));
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<CHILD_TYPE>
withPhysicalPropertiesAndStats(
+ PhysicalProperties physicalProperties, Statistics statistics) {
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, getLogicalProperties(),
+ physicalProperties, statistics, child());
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<CHILD_TYPE>
withAggOutput(List<NamedExpression> newOutput) {
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
newOutput,
+ Optional.empty(), getLogicalProperties(),
+ physicalProperties, statistics, child());
+ }
+
+ @Override
+ public String shapeInfo() {
+ StringBuilder builder = new StringBuilder("bucketedHashAgg[");
+ builder.append("BUCKETED");
+ builder.append(']');
+ return builder.toString();
+ }
+
+ @Override
+ public List<Slot> computeOutput() {
+ return outputExpressions.stream()
+ .map(NamedExpression::toSlot)
+ .collect(ImmutableList.toImmutableList());
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<CHILD_TYPE> resetLogicalProperties() {
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, null,
+ physicalProperties, statistics, child());
+ }
+
+ @Override
+ public void computeUnique(DataTrait.Builder builder) {
+ DataTrait childFd = child(0).getLogicalProperties().getTrait();
+
+ ImmutableSet.Builder<Slot> groupByKeysBuilder = ImmutableSet.builder();
+ for (Expression expr : groupByExpressions) {
+ groupByKeysBuilder.addAll(expr.getInputSlots());
+ }
+ ImmutableSet<Slot> groupByKeys = groupByKeysBuilder.build();
+
+ if (groupByExpressions.isEmpty() ||
childFd.isUniformAndNotNull(groupByKeys)) {
+ getOutput().forEach(builder::addUniqueSlot);
+ return;
+ }
+
+ builder.addUniqueSlot(childFd);
+ builder.addUniqueSlot(groupByKeys);
+ }
+
+ @Override
+ public void computeUniform(DataTrait.Builder builder) {
Review Comment:
should same as
`org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate#computeUniform`
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -223,6 +224,14 @@ public PhysicalProperties visitPhysicalHashAggregate(
}
}
+ @Override
+ public PhysicalProperties visitPhysicalBucketedHashAggregate(
+ PhysicalBucketedHashAggregate<? extends Plan> agg, PlanContext
context) {
+ Preconditions.checkState(childrenOutputProperties.size() == 1);
+ PhysicalProperties childOutputProperty =
childrenOutputProperties.get(0);
+ return new
PhysicalProperties(childOutputProperty.getDistributionSpec());
Review Comment:
the output still obey the data distribution of the child nodes? no date
exchanged between instances?
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBucketedHashAggregate.java:
##########
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Physical bucketed hash aggregation plan node.
+ *
+ * Fuses two-phase aggregation (local + global) into a single operator for
single-BE deployments.
+ * The sink side builds per-instance hash tables from raw input (first-phase
agg).
+ * The source side merges across instances per-bucket using direct in-memory
merge
+ * (no serialization/deserialization) and outputs the final result.
+ *
+ * This node replaces the pattern: GlobalAgg -> PhysicalDistribute -> LocalAgg
+ * with a single fused operator, eliminating exchange overhead entirely.
+ */
+public class PhysicalBucketedHashAggregate<CHILD_TYPE extends Plan> extends
PhysicalUnary<CHILD_TYPE>
+ implements Aggregate<CHILD_TYPE> {
+
+ private final List<Expression> groupByExpressions;
+ private final List<NamedExpression> outputExpressions;
+
+ public PhysicalBucketedHashAggregate(List<Expression> groupByExpressions,
+ List<NamedExpression> outputExpressions,
+ LogicalProperties logicalProperties, CHILD_TYPE child) {
+ this(groupByExpressions, outputExpressions, Optional.empty(),
logicalProperties, child);
+ }
+
+ public PhysicalBucketedHashAggregate(List<Expression> groupByExpressions,
+ List<NamedExpression> outputExpressions,
+ Optional<GroupExpression> groupExpression,
+ LogicalProperties logicalProperties, CHILD_TYPE child) {
+ super(PlanType.PHYSICAL_BUCKETED_HASH_AGGREGATE, groupExpression,
logicalProperties, child);
+ this.groupByExpressions = ImmutableList.copyOf(
+ Objects.requireNonNull(groupByExpressions, "groupByExpressions
cannot be null"));
+ this.outputExpressions = ImmutableList.copyOf(
+ Objects.requireNonNull(outputExpressions, "outputExpressions
cannot be null"));
+ }
+
+ /**
+ * Constructor with group expression, logical properties, physical
properties, and statistics.
+ */
+ public PhysicalBucketedHashAggregate(List<Expression> groupByExpressions,
+ List<NamedExpression> outputExpressions,
+ Optional<GroupExpression> groupExpression,
+ LogicalProperties logicalProperties,
+ PhysicalProperties physicalProperties, Statistics statistics,
CHILD_TYPE child) {
+ super(PlanType.PHYSICAL_BUCKETED_HASH_AGGREGATE, groupExpression,
logicalProperties,
+ physicalProperties, statistics, child);
+ this.groupByExpressions = ImmutableList.copyOf(
+ Objects.requireNonNull(groupByExpressions, "groupByExpressions
cannot be null"));
+ this.outputExpressions = ImmutableList.copyOf(
+ Objects.requireNonNull(outputExpressions, "outputExpressions
cannot be null"));
+ }
+
+ @Override
+ public List<Expression> getGroupByExpressions() {
+ return groupByExpressions;
+ }
+
+ @Override
+ public List<NamedExpression> getOutputExpressions() {
+ return outputExpressions;
+ }
+
+ @Override
+ public List<NamedExpression> getOutputs() {
+ return outputExpressions;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPhysicalBucketedHashAggregate(this, context);
+ }
+
+ @Override
+ public List<? extends Expression> getExpressions() {
+ return new ImmutableList.Builder<Expression>()
+ .addAll(groupByExpressions)
+ .addAll(outputExpressions)
+ .build();
+ }
+
+ @Override
+ public String toString() {
+ return Utils.toSqlString("PhysicalBucketedHashAggregate[" + id.asInt()
+ "]" + getGroupIdWithPrefix(),
+ "stats", statistics,
+ "groupByExpr", groupByExpressions,
+ "outputExpr", outputExpressions
+ );
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PhysicalBucketedHashAggregate<?> that =
(PhysicalBucketedHashAggregate<?>) o;
+ return Objects.equals(groupByExpressions, that.groupByExpressions)
+ && Objects.equals(outputExpressions, that.outputExpressions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupByExpressions, outputExpressions);
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<Plan> withChildren(List<Plan>
children) {
+ Preconditions.checkArgument(children.size() == 1);
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, getLogicalProperties(),
+ physicalProperties, statistics, children.get(0));
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<CHILD_TYPE> withGroupExpression(
+ Optional<GroupExpression> groupExpression) {
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, getLogicalProperties(), child());
+ }
+
+ @Override
+ public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
+ Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
+ Preconditions.checkArgument(children.size() == 1);
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, logicalProperties.get(), children.get(0));
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<CHILD_TYPE>
withPhysicalPropertiesAndStats(
+ PhysicalProperties physicalProperties, Statistics statistics) {
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, getLogicalProperties(),
+ physicalProperties, statistics, child());
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<CHILD_TYPE>
withAggOutput(List<NamedExpression> newOutput) {
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
newOutput,
+ Optional.empty(), getLogicalProperties(),
+ physicalProperties, statistics, child());
+ }
+
+ @Override
+ public String shapeInfo() {
+ StringBuilder builder = new StringBuilder("bucketedHashAgg[");
+ builder.append("BUCKETED");
+ builder.append(']');
+ return builder.toString();
+ }
+
+ @Override
+ public List<Slot> computeOutput() {
+ return outputExpressions.stream()
+ .map(NamedExpression::toSlot)
+ .collect(ImmutableList.toImmutableList());
+ }
+
+ @Override
+ public PhysicalBucketedHashAggregate<CHILD_TYPE> resetLogicalProperties() {
+ return new PhysicalBucketedHashAggregate<>(groupByExpressions,
outputExpressions,
+ groupExpression, null,
+ physicalProperties, statistics, child());
+ }
+
+ @Override
+ public void computeUnique(DataTrait.Builder builder) {
+ DataTrait childFd = child(0).getLogicalProperties().getTrait();
Review Comment:
should same as
`org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate#computeUnique`
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java:
##########
@@ -165,6 +183,285 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr
expr, Map<String, Strin
aggregate.getLogicalProperties(), localAgg));
}
+ /**
+ * Implements bucketed hash aggregation for single-BE deployments.
+ * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate
operator,
+ * eliminating exchange overhead and serialization/deserialization costs.
+ *
+ * Only generated when:
+ * 1. enable_bucketed_hash_agg session variable is true
+ * 2. Cluster has exactly one alive BE
+ * 3. Aggregate has GROUP BY keys (no without-key aggregation)
+ * 4. Aggregate functions support two-phase execution
+ * 5. Data volume checks pass (min input rows, max group keys)
+ */
+ private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan>
aggregate, ConnectContext ctx) {
+ if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+ return ImmutableList.of();
+ }
+ // Only for single-BE deployments
+ int beNumber = ctx.getEnv().getClusterInfo().getBackendsNumber(true);
+ if (beNumber != 1) {
+ return ImmutableList.of();
+ }
+ // Without-key aggregation not supported in initial version
+ if (aggregate.getGroupByExpressions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ // Must support two-phase execution (same check as splitTwoPhase)
+ if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+ return ImmutableList.of();
+ }
+ // Skip aggregates with no aggregate functions (pure GROUP BY dedup).
+ // These are produced by DistinctAggregateRewriter as the bottom dedup
phase.
+ if (aggregate.getAggregateFunctions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ // Skip aggregates containing multi-distinct functions (e.g.,
multi_distinct_count,
+ // multi_distinct_sum). These are semantically distinct aggregations
rewritten by
+ // DistinctAggregateRewriter — they embed deduplication in the
BE-level function.
+ // The bucketed agg cost model does not account for deduplication
overhead, which
+ // causes the base-table bucketed path to appear artificially cheap
compared to
+ // materialized views using pre-aggregated bitmap_union/hll_union.
+ for (AggregateFunction func : aggregate.getAggregateFunctions()) {
+ if (func instanceof MultiDistinction) {
+ return ImmutableList.of();
+ }
+ }
+ // Skip aggregates whose child group contains a LogicalAggregate.
+ // This detects the top aggregate in a DISTINCT decomposition (e.g.,
+ // COUNT(DISTINCT a) GROUP BY b is rewritten to COUNT(a) GROUP BY b
+ // on top of GROUP BY a,b dedup). Bucketed agg does not support
+ // DISTINCT aggregation in the initial version.
+ if (childGroupContainsAggregate(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Skip when sortByGroupKey optimization applies. This is detected by
+ // checking if the aggregate's owner group has a LogicalTopN parent
+ // whose order key expressions equal the group-by keys (produced by
+ // LimitAggToTopNAgg rewrite). PhysicalBucketedHashAggregate does not
+ // support sortByGroupKey, so we yield to the regular hash-agg plan.
+ if (hasSortByGroupKeyTopN(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Skip when data is already distributed by the GROUP BY keys
+ // (e.g., table bucketed by UserID, query GROUP BY UserID).
+ // In this case the two-phase plan needs no exchange and is strictly
+ // better than bucketed agg (no 256-bucket overhead, no merge phase).
+ if (groupByKeysSatisfyDistribution(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Data-volume-based checks: control bucketed agg eligibility based on
+ // estimated data scale, similar to ClickHouse's
group_by_two_level_threshold
+ // and group_by_two_level_threshold_bytes. This reduces reliance on
+ // column-level statistics which may be inaccurate or missing.
+ //
+ // When statistics are unavailable (groupExpression absent or
childStats null),
+ // conservatively skip bucketed agg — without data volume information
we cannot
+ // make an informed decision, and the risk of choosing bucketed agg in
a
+ // high-cardinality scenario outweighs the potential benefit.
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return ImmutableList.of();
+ }
+ GroupExpression ge = aggregate.getGroupExpression().get();
+ Statistics childStats = ge.childStatistics(0);
+ if (childStats == null) {
+ return ImmutableList.of();
+ }
+ double rows = childStats.getRowCount();
+ long minInputRows = ctx.getSessionVariable().bucketedAggMinInputRows;
+ long maxGroupKeys = ctx.getSessionVariable().bucketedAggMaxGroupKeys;
+
+ // Gate: minimum input rows.
+ // When input data is too small, the overhead of initializing 256
+ // per-bucket hash tables and the pipelined merge phase outweighs
+ // the benefit of eliminating exchange. Skip bucketed agg.
+ if (minInputRows > 0 && rows < minInputRows) {
+ return ImmutableList.of();
+ }
+
+ // Gate: maximum estimated group keys (similar to ClickHouse's
+ // group_by_two_level_threshold). When the number of distinct groups
+ // is too large, the source-side merge must combine too many keys
+ // across instances, and the merge cost dominates. Skip bucketed agg.
+ Statistics aggStats = ge.getOwnerGroup().getStatistics();
+ if (maxGroupKeys > 0 && aggStats != null && aggStats.getRowCount() >
maxGroupKeys) {
+ return ImmutableList.of();
+ }
+
+ // High-cardinality ratio checks (existing logic).
+ // These complement the absolute thresholds above with relative checks:
+ // 1. Single-column NDV check: if ANY GROUP BY key's NDV > rows *
threshold,
+ // the combined NDV is at least that high.
+ // 2. Aggregation ratio check: if estimated output rows > rows *
threshold,
+ // merge cost dominates.
+ double highCardThreshold = 0.3;
+ for (Expression groupByKey : aggregate.getGroupByExpressions()) {
+ ColumnStatistic colStat =
childStats.findColumnStatistics(groupByKey);
+ if (colStat != null && !colStat.isUnKnown() && colStat.ndv > rows
* highCardThreshold) {
+ return ImmutableList.of();
+ }
+ }
+ if (aggStats != null && aggStats.getRowCount() > rows *
highCardThreshold) {
+ return ImmutableList.of();
+ }
+ // Build output expressions: rewrite AggregateFunction ->
AggregateExpression with GLOBAL_RESULT param
+ // (same as one-phase aggregation — raw input directly produces final
result).
+ List<NamedExpression> aggOutput =
ExpressionUtils.rewriteDownShortCircuit(
+ aggregate.getOutputExpressions(), expr -> {
+ if (!(expr instanceof AggregateFunction)) {
+ return expr;
+ }
+ return new AggregateExpression((AggregateFunction) expr,
AggregateParam.GLOBAL_RESULT);
+ }
+ );
+ return ImmutableList.of(new PhysicalBucketedHashAggregate<>(
+ aggregate.getGroupByExpressions(), aggOutput,
+ aggregate.getLogicalProperties(), aggregate.child()));
+ }
+
+ /**
+ * Check if the child group of this aggregate contains a LogicalAggregate.
+ * This is used to detect aggregates produced by DISTINCT decomposition
rewrites
+ * (e.g., DistinctAggregateRewriter, SplitMultiDistinctStrategy), where
the original
+ * DISTINCT aggregate is split into a top non-distinct aggregate over a
bottom dedup aggregate.
+ */
+ private boolean childGroupContainsAggregate(LogicalAggregate<? extends
Plan> aggregate) {
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return false;
+ }
+ GroupExpression groupExpr = aggregate.getGroupExpression().get();
+ if (groupExpr.arity() == 0) {
+ return false;
+ }
+ Group childGroup = groupExpr.child(0);
+ for (GroupExpression childGroupExpr :
childGroup.getLogicalExpressions()) {
+ if (childGroupExpr.getPlan() instanceof LogicalAggregate) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Check if a LogicalTopN parent exists whose order keys are identical to
+ * the aggregate's group-by keys. This means PushTopnToAgg will later set
+ * sortByGroupKey on PhysicalHashAggregate; bucketed agg doesn't support
+ * that optimization so we skip it.
+ *
+ * Handles both TopN->Agg and TopN->Project->Agg patterns.
+ */
+ private boolean hasSortByGroupKeyTopN(LogicalAggregate<? extends Plan>
aggregate) {
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return false;
+ }
+ List<Expression> groupByKeys = aggregate.getGroupByExpressions();
+ Group ownerGroup =
aggregate.getGroupExpression().get().getOwnerGroup();
+ for (GroupExpression parentGE :
ownerGroup.getParentGroupExpressions()) {
+ Plan parentPlan = parentGE.getPlan();
+ if (parentPlan instanceof LogicalTopN
+ && orderKeysMatchGroupKeys((LogicalTopN<?>) parentPlan,
groupByKeys)) {
+ return true;
+ }
+ if (parentPlan instanceof LogicalProject &&
parentGE.getOwnerGroup() != null) {
+ for (GroupExpression gpGE :
parentGE.getOwnerGroup().getParentGroupExpressions()) {
+ if (gpGE.getPlan() instanceof LogicalTopN
+ && orderKeysMatchGroupKeys((LogicalTopN<?>)
gpGE.getPlan(), groupByKeys)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean orderKeysMatchGroupKeys(LogicalTopN<?> topN,
List<Expression> groupByKeys) {
+ List<OrderKey> orderKeys = topN.getOrderKeys();
+ if (orderKeys.size() != groupByKeys.size()) {
+ return false;
+ }
+ for (int i = 0; i < groupByKeys.size(); i++) {
+ if (!groupByKeys.get(i).equals(orderKeys.get(i).getExpr())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Check if the GROUP BY keys of this aggregate are a superset of (or
equal to)
+ * the underlying OlapTable's hash distribution columns. When this is true,
+ * the data is already correctly partitioned for the aggregation, so the
+ * two-phase plan (local + global) requires no exchange and is strictly
better
+ * than bucketed agg (no 256-bucket overhead, no merge phase).
+ *
+ * Traverses the child group in the Memo to find a LogicalOlapScan,
+ * walking through LogicalProject and LogicalFilter transparently.
+ */
+ private boolean groupByKeysSatisfyDistribution(LogicalAggregate<? extends
Plan> aggregate) {
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return false;
+ }
+ GroupExpression groupExpr = aggregate.getGroupExpression().get();
+ if (groupExpr.arity() == 0) {
+ return false;
+ }
+ OlapTable table = findOlapTableInGroup(groupExpr.child(0), 5);
Review Comment:
if the aggregate's child is not olap table, will we generate a bucketed
aggregate even if the data distribution Satisfy groupByKeys?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]