morrySnow commented on code in PR #65024:
URL: https://github.com/apache/doris/pull/65024#discussion_r3503214244
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -158,6 +158,12 @@ private boolean
shouldBanOnePhaseAgg(PhysicalHashAggregate<? extends Plan> aggre
// group by key is skew
return skewOnShuffleExpr(aggregate);
} else {
+ // Bucketed hash agg exception: on single-BE with bucketed agg
enabled,
+ // the one-phase GLOBAL + distribute pattern is the basis for
translator
+ // fusion into BucketedAggregationNode. Allow this pattern.
+ if
(AggregateUtils.isBucketedHashAggEnabled(aggregate.getGroupByExpressions().size()))
{
Review Comment:
The bucketed agg exception here allows the one-phase GLOBAL + distribute
pattern whenever the basic eligibility check passes (session var, single-BE, no
smooth upgrade, has GROUP BY). However, the old `implementBucketedPhase` also
checked:
- `bucketedAggMaxGroupKeys` — skip when estimated groups exceed threshold
- `bucketedAggHighCardThreshold` — skip when NDV / output rows indicate high
cardinality
- `hasSortByGroupKeyTopN` — skip when sortByGroupKey optimization is
applicable
- `groupByKeysSatisfyDistribution` — skip when data is already distributed
by group keys
- Empty agg functions / MultiDistinction / child group containing
LogicalAggregate
Only `bucketedAggMinInputRows` is preserved (in CostModel). The dropped
checks could cause bucketed agg to be used in scenarios where the old code
correctly avoided it. Consider re-adding the most critical guards here, or
documenting why they are no longer needed.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -217,6 +223,14 @@ private boolean
onePhaseAggWithDistribute(PhysicalHashAggregate<? extends Plan>
&& children.get(0).getPlan() instanceof PhysicalDistribute;
}
+ /**
+ * Check whether bucketed hash aggregation allows the one-phase GLOBAL +
distribute
+ * pattern. Delegates to the shared eligibility check in AggregateUtils.
+ */
+ private boolean shouldAllowForBucketedAgg(PhysicalHashAggregate<? extends
Plan> aggregate) {
Review Comment:
This method is defined but never called anywhere in the codebase. It appears
to be dead code left over from the refactoring. Either wire it into
`shouldBanOnePhaseAgg` (replacing the inline
`AggregateUtils.isBucketedHashAggEnabled` call at line 164) or remove it.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -3068,9 +2988,125 @@ private PlanFragment connectJoinNode(HashJoinNode
hashJoinNode, PlanFragment lef
return leftFragment;
}
+ /**
+ * Check whether the one-phase GLOBAL hash aggregate can be fused with its
+ * distribute child into a BucketedAggregationNode. This eliminates
exchange
+ * overhead on single-BE deployments by using in-memory per-bucket merging.
+ */
+ private boolean shouldUseBucketedFusion(PhysicalHashAggregate<? extends
Plan> aggregate) {
+ // Shared eligibility: session var, single-BE, GROUP BY, smooth upgrade
+ if
(!AggregateUtils.isBucketedHashAggEnabled(aggregate.getGroupByExpressions().size()))
{
+ return false;
+ }
+ // Must be one-phase: GLOBAL + INPUT_TO_RESULT
+ if (aggregate.getAggPhase() != AggPhase.GLOBAL
+ || aggregate.getAggMode() != AggMode.INPUT_TO_RESULT) {
+ return false;
+ }
+ // Child must be PhysicalDistribute with hash distribution matching
group keys
+ Plan child = aggregate.child(0);
+ if (!(child instanceof PhysicalDistribute)) {
+ return false;
+ }
+ DistributionSpec distSpec = ((PhysicalDistribute<?>)
child).getDistributionSpec();
+ if (!(distSpec instanceof DistributionSpecHash)) {
+ return false;
+ }
+ List<ExprId> distKeys = ((DistributionSpecHash)
distSpec).getOrderedShuffledColumns();
+ List<ExprId> groupByKeys = aggregate.getGroupByExpressions().stream()
Review Comment:
The `groupByKeys` list filters to only `SlotReference` instances via
`.filter(SlotReference.class::isInstance)`. If any group-by expression is a
CAST-wrapped slot (e.g., `CAST(a AS BIGINT)` where `a` is INT), it will be
silently excluded from the comparison, causing `distKeys.equals(groupByKeys)`
to fail and the fusion to be skipped.
This is conservative and safe (no incorrect fusion), but it means fusion
opportunities are silently missed when the group-by uses CAST expressions.
Consider either:
1. Adding logic to unwrap CAST expressions to extract the underlying
SlotReference, or
2. Adding a log/debug message when fusion is skipped due to
non-SlotReference group-by expressions.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java:
##########
@@ -361,25 +362,25 @@ public Cost visitPhysicalHashAggregate(
inputStatistics.getRowCount() / beNumber, 0);
} else {
int factor = aggregate.getGroupByExpressions().isEmpty() ? 1 :
beNumber;
- // global
+ double rowCost = inputStatistics.getRowCount() / factor;
+
+ // Bucketed fusion discount: when the one-phase GLOBAL
INPUT_TO_RESULT
+ // aggregate is generated with a PhysicalDistribute child on
single BE,
+ // the translator will fuse them into a BucketedAggregationNode.
+ // Apply a discount to make this path preferred over two-phase
+ // (local+global) aggregation which requires extra
serialize/deserialize.
+ if (aggregate.getAggMode() == AggMode.INPUT_TO_RESULT
+ && beNumber == 1
+ &&
AggregateUtils.isBucketedHashAggEnabled(aggregate.getGroupByExpressions().size())
+ && inputStatistics.getRowCount()
+ >=
context.getSessionVariable().bucketedAggMinInputRows) {
Review Comment:
The discount factor `0.5` (50% off) is hardcoded. This makes the one-phase
path always half the cost of two-phase on single-BE when eligible. Consider:
1. Extracting this to a named constant (e.g., `BUCKETED_AGG_COST_DISCOUNT`).
2. Making it a session variable so it can be tuned without a code change.
This would provide an escape hatch if the discount proves too aggressive in
production.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/util/AggregateUtils.java:
##########
@@ -191,4 +193,46 @@ public static boolean
isOrderKeysMatchGroupKeys(List<OrderKey> orderKeys,
}
return true;
}
+
+ /**
+ * Check the basic environmental conditions for bucketed hash aggregation.
+ * This is the shared eligibility gate used by ChildrenPropertiesRegulator
+ * (to allow the one-phase-GLOBAL+distribute pattern), CostModel (for cost
+ * discount), and PhysicalPlanTranslator (for fusion into
BucketedAggregationNode).
+ *
+ * @return true if the session variable is enabled, there is exactly one
alive BE,
+ * no smooth upgrade is in progress, and the aggregate has GROUP
BY keys.
+ */
+ public static boolean isBucketedHashAggEnabled(int groupByExprCount) {
+ ConnectContext ctx = ConnectContext.get();
+ if (ctx == null) {
+ return false;
+ }
+ if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+ return false;
+ }
+ // Must have GROUP BY keys (without-key aggregation not supported)
+ if (groupByExprCount == 0) {
+ return false;
+ }
+ // Correctness gate: single-BE only (cross-BE in-memory merge is
impossible).
+ // Use be_number_for_test first (set by regression tests), fall back
to real cluster count.
+ int beNumber = ctx.getSessionVariable().getBeNumberForTest();
+ if (beNumber < 0) {
+ beNumber = Math.max(1,
ctx.getEnv().getClusterInfo().getBackendsNumber(true));
Review Comment:
The `beNumber` computation uses `Math.max(1, ...)` as a fallback, which
means if `getBackendsNumber(true)` returns 0 (shouldn't happen in a real
cluster but could in edge cases), it would be treated as 1, enabling bucketed
agg. Consider using the raw value and letting the `beNumber != 1` check handle
the zero case: if there are 0 BEs, bucketed agg should not be enabled.
Also, the test override uses `beNumber < 0` (negative means "not set"),
while `CostModel` constructor uses `getBeNumberForTest() != -1`. These are
functionally equivalent since `-1` is the sentinel, but the inconsistency in
the sentinel check convention is slightly confusing. Consider using the same
convention in both places.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -3068,9 +2988,125 @@ private PlanFragment connectJoinNode(HashJoinNode
hashJoinNode, PlanFragment lef
return leftFragment;
}
+ /**
+ * Check whether the one-phase GLOBAL hash aggregate can be fused with its
+ * distribute child into a BucketedAggregationNode. This eliminates
exchange
+ * overhead on single-BE deployments by using in-memory per-bucket merging.
+ */
+ private boolean shouldUseBucketedFusion(PhysicalHashAggregate<? extends
Plan> aggregate) {
+ // Shared eligibility: session var, single-BE, GROUP BY, smooth upgrade
+ if
(!AggregateUtils.isBucketedHashAggEnabled(aggregate.getGroupByExpressions().size()))
{
+ return false;
+ }
+ // Must be one-phase: GLOBAL + INPUT_TO_RESULT
+ if (aggregate.getAggPhase() != AggPhase.GLOBAL
+ || aggregate.getAggMode() != AggMode.INPUT_TO_RESULT) {
+ return false;
+ }
+ // Child must be PhysicalDistribute with hash distribution matching
group keys
+ Plan child = aggregate.child(0);
+ if (!(child instanceof PhysicalDistribute)) {
+ return false;
+ }
+ DistributionSpec distSpec = ((PhysicalDistribute<?>)
child).getDistributionSpec();
+ if (!(distSpec instanceof DistributionSpecHash)) {
+ return false;
+ }
+ List<ExprId> distKeys = ((DistributionSpecHash)
distSpec).getOrderedShuffledColumns();
+ List<ExprId> groupByKeys = aggregate.getGroupByExpressions().stream()
+ .filter(SlotReference.class::isInstance)
+ .map(SlotReference.class::cast)
+ .map(SlotReference::getExprId)
+ .collect(Collectors.toList());
+ return distKeys.equals(groupByKeys);
+ }
+
+ /**
+ * Fuse a one-phase GLOBAL hash aggregate and its PhysicalDistribute child
+ * into a BucketedAggregationNode, skipping the exchange node entirely.
+ * Visits the distribute's child directly to keep everything in one
fragment.
+ */
+ private PlanFragment visitBucketedFusion(
Review Comment:
`visitBucketedFusion` duplicates a significant amount of code from the
standard `visitPhysicalHashAggregate` path (group-by slot collection, agg
function translation, output tuple building). The old
`visitPhysicalBucketedHashAggregate` had the same duplication, so it's
pre-existing. However, now that both paths live in the same class (rather than
separate visitor methods), consider extracting the shared logic into a helper
method (e.g., `buildBucketedAggInfo`) to reduce future maintenance burden. The
PR description mentions this as a benefit of the refactor — actually delivering
on it would make the code even cleaner.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]