morrySnow commented on code in PR #65024:
URL: https://github.com/apache/doris/pull/65024#discussion_r3503214244


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -158,6 +158,12 @@ private boolean 
shouldBanOnePhaseAgg(PhysicalHashAggregate<? extends Plan> aggre
             // group by key is skew
             return skewOnShuffleExpr(aggregate);
         } else {
+            // Bucketed hash agg exception: on single-BE with bucketed agg 
enabled,
+            // the one-phase GLOBAL + distribute pattern is the basis for 
translator
+            // fusion into BucketedAggregationNode. Allow this pattern.
+            if 
(AggregateUtils.isBucketedHashAggEnabled(aggregate.getGroupByExpressions().size()))
 {

Review Comment:
   The bucketed agg exception here allows the one-phase GLOBAL + distribute 
pattern whenever the basic eligibility check passes (session var, single-BE, no 
smooth upgrade, has GROUP BY). However, the old `implementBucketedPhase` also 
checked:
   - `bucketedAggMaxGroupKeys` — skip when estimated groups exceed threshold
   - `bucketedAggHighCardThreshold` — skip when NDV / output rows indicate high 
cardinality
   - `hasSortByGroupKeyTopN` — skip when sortByGroupKey optimization is 
applicable
   - `groupByKeysSatisfyDistribution` — skip when data is already distributed 
by group keys
   - Empty agg functions / MultiDistinction / child group containing 
LogicalAggregate
   
   Only `bucketedAggMinInputRows` is preserved (in CostModel). The dropped 
checks could cause bucketed agg to be used in scenarios where the old code 
correctly avoided it. Consider re-adding the most critical guards here, or 
documenting why they are no longer needed.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -217,6 +223,14 @@ private boolean 
onePhaseAggWithDistribute(PhysicalHashAggregate<? extends Plan>
                 && children.get(0).getPlan() instanceof PhysicalDistribute;
     }
 
+    /**
+     * Check whether bucketed hash aggregation allows the one-phase GLOBAL + 
distribute
+     * pattern. Delegates to the shared eligibility check in AggregateUtils.
+     */
+    private boolean shouldAllowForBucketedAgg(PhysicalHashAggregate<? extends 
Plan> aggregate) {

Review Comment:
   This method is defined but never called anywhere in the codebase. It appears 
to be dead code left over from the refactoring. Either wire it into 
`shouldBanOnePhaseAgg` (replacing the inline 
`AggregateUtils.isBucketedHashAggEnabled` call at line 164) or remove it.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -3068,9 +2988,125 @@ private PlanFragment connectJoinNode(HashJoinNode 
hashJoinNode, PlanFragment lef
         return leftFragment;
     }
 
+    /**
+     * Check whether the one-phase GLOBAL hash aggregate can be fused with its
+     * distribute child into a BucketedAggregationNode. This eliminates 
exchange
+     * overhead on single-BE deployments by using in-memory per-bucket merging.
+     */
+    private boolean shouldUseBucketedFusion(PhysicalHashAggregate<? extends 
Plan> aggregate) {
+        // Shared eligibility: session var, single-BE, GROUP BY, smooth upgrade
+        if 
(!AggregateUtils.isBucketedHashAggEnabled(aggregate.getGroupByExpressions().size()))
 {
+            return false;
+        }
+        // Must be one-phase: GLOBAL + INPUT_TO_RESULT
+        if (aggregate.getAggPhase() != AggPhase.GLOBAL
+                || aggregate.getAggMode() != AggMode.INPUT_TO_RESULT) {
+            return false;
+        }
+        // Child must be PhysicalDistribute with hash distribution matching 
group keys
+        Plan child = aggregate.child(0);
+        if (!(child instanceof PhysicalDistribute)) {
+            return false;
+        }
+        DistributionSpec distSpec = ((PhysicalDistribute<?>) 
child).getDistributionSpec();
+        if (!(distSpec instanceof DistributionSpecHash)) {
+            return false;
+        }
+        List<ExprId> distKeys = ((DistributionSpecHash) 
distSpec).getOrderedShuffledColumns();
+        List<ExprId> groupByKeys = aggregate.getGroupByExpressions().stream()

Review Comment:
   The `groupByKeys` list filters to only `SlotReference` instances via 
`.filter(SlotReference.class::isInstance)`. If any group-by expression is a 
CAST-wrapped slot (e.g., `CAST(a AS BIGINT)` where `a` is INT), it will be 
silently excluded from the comparison, causing `distKeys.equals(groupByKeys)` 
to fail and the fusion to be skipped.
   
   This is conservative and safe (no incorrect fusion), but it means fusion 
opportunities are silently missed when the group-by uses CAST expressions. 
Consider either:
   1. Adding logic to unwrap CAST expressions to extract the underlying 
SlotReference, or
   2. Adding a log/debug message when fusion is skipped due to 
non-SlotReference group-by expressions.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java:
##########
@@ -361,25 +362,25 @@ public Cost visitPhysicalHashAggregate(
                     inputStatistics.getRowCount() / beNumber, 0);
         } else {
             int factor = aggregate.getGroupByExpressions().isEmpty() ? 1 : 
beNumber;
-            // global
+            double rowCost = inputStatistics.getRowCount() / factor;
+
+            // Bucketed fusion discount: when the one-phase GLOBAL 
INPUT_TO_RESULT
+            // aggregate is generated with a PhysicalDistribute child on 
single BE,
+            // the translator will fuse them into a BucketedAggregationNode.
+            // Apply a discount to make this path preferred over two-phase
+            // (local+global) aggregation which requires extra 
serialize/deserialize.
+            if (aggregate.getAggMode() == AggMode.INPUT_TO_RESULT
+                    && beNumber == 1
+                    && 
AggregateUtils.isBucketedHashAggEnabled(aggregate.getGroupByExpressions().size())
+                    && inputStatistics.getRowCount()
+                        >= 
context.getSessionVariable().bucketedAggMinInputRows) {

Review Comment:
   The discount factor `0.5` (50% off) is hardcoded. This makes the one-phase 
path always half the cost of two-phase on single-BE when eligible. Consider:
   1. Extracting this to a named constant (e.g., `BUCKETED_AGG_COST_DISCOUNT`).
   2. Making it a session variable so it can be tuned without a code change. 
This would provide an escape hatch if the discount proves too aggressive in 
production.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/util/AggregateUtils.java:
##########
@@ -191,4 +193,46 @@ public static boolean 
isOrderKeysMatchGroupKeys(List<OrderKey> orderKeys,
         }
         return true;
     }
+
+    /**
+     * Check the basic environmental conditions for bucketed hash aggregation.
+     * This is the shared eligibility gate used by ChildrenPropertiesRegulator
+     * (to allow the one-phase-GLOBAL+distribute pattern), CostModel (for cost
+     * discount), and PhysicalPlanTranslator (for fusion into 
BucketedAggregationNode).
+     *
+     * @return true if the session variable is enabled, there is exactly one 
alive BE,
+     *         no smooth upgrade is in progress, and the aggregate has GROUP 
BY keys.
+     */
+    public static boolean isBucketedHashAggEnabled(int groupByExprCount) {
+        ConnectContext ctx = ConnectContext.get();
+        if (ctx == null) {
+            return false;
+        }
+        if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+            return false;
+        }
+        // Must have GROUP BY keys (without-key aggregation not supported)
+        if (groupByExprCount == 0) {
+            return false;
+        }
+        // Correctness gate: single-BE only (cross-BE in-memory merge is 
impossible).
+        // Use be_number_for_test first (set by regression tests), fall back 
to real cluster count.
+        int beNumber = ctx.getSessionVariable().getBeNumberForTest();
+        if (beNumber < 0) {
+            beNumber = Math.max(1, 
ctx.getEnv().getClusterInfo().getBackendsNumber(true));

Review Comment:
   The `beNumber` computation uses `Math.max(1, ...)` as a fallback, which 
means if `getBackendsNumber(true)` returns 0 (shouldn't happen in a real 
cluster but could in edge cases), it would be treated as 1, enabling bucketed 
agg. Consider using the raw value and letting the `beNumber != 1` check handle 
the zero case: if there are 0 BEs, bucketed agg should not be enabled.
   
   Also, the test override uses `beNumber < 0` (negative means "not set"), 
while `CostModel` constructor uses `getBeNumberForTest() != -1`. These are 
functionally equivalent since `-1` is the sentinel, but the inconsistency in 
the sentinel check convention is slightly confusing. Consider using the same 
convention in both places.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -3068,9 +2988,125 @@ private PlanFragment connectJoinNode(HashJoinNode 
hashJoinNode, PlanFragment lef
         return leftFragment;
     }
 
+    /**
+     * Check whether the one-phase GLOBAL hash aggregate can be fused with its
+     * distribute child into a BucketedAggregationNode. This eliminates 
exchange
+     * overhead on single-BE deployments by using in-memory per-bucket merging.
+     */
+    private boolean shouldUseBucketedFusion(PhysicalHashAggregate<? extends 
Plan> aggregate) {
+        // Shared eligibility: session var, single-BE, GROUP BY, smooth upgrade
+        if 
(!AggregateUtils.isBucketedHashAggEnabled(aggregate.getGroupByExpressions().size()))
 {
+            return false;
+        }
+        // Must be one-phase: GLOBAL + INPUT_TO_RESULT
+        if (aggregate.getAggPhase() != AggPhase.GLOBAL
+                || aggregate.getAggMode() != AggMode.INPUT_TO_RESULT) {
+            return false;
+        }
+        // Child must be PhysicalDistribute with hash distribution matching 
group keys
+        Plan child = aggregate.child(0);
+        if (!(child instanceof PhysicalDistribute)) {
+            return false;
+        }
+        DistributionSpec distSpec = ((PhysicalDistribute<?>) 
child).getDistributionSpec();
+        if (!(distSpec instanceof DistributionSpecHash)) {
+            return false;
+        }
+        List<ExprId> distKeys = ((DistributionSpecHash) 
distSpec).getOrderedShuffledColumns();
+        List<ExprId> groupByKeys = aggregate.getGroupByExpressions().stream()
+                .filter(SlotReference.class::isInstance)
+                .map(SlotReference.class::cast)
+                .map(SlotReference::getExprId)
+                .collect(Collectors.toList());
+        return distKeys.equals(groupByKeys);
+    }
+
+    /**
+     * Fuse a one-phase GLOBAL hash aggregate and its PhysicalDistribute child
+     * into a BucketedAggregationNode, skipping the exchange node entirely.
+     * Visits the distribute's child directly to keep everything in one 
fragment.
+     */
+    private PlanFragment visitBucketedFusion(

Review Comment:
   `visitBucketedFusion` duplicates a significant amount of code from the 
standard `visitPhysicalHashAggregate` path (group-by slot collection, agg 
function translation, output tuple building). The old 
`visitPhysicalBucketedHashAggregate` had the same duplication, so it's 
pre-existing. However, now that both paths live in the same class (rather than 
separate visitor methods), consider extracting the shared logic into a helper 
method (e.g., `buildBucketedAggInfo`) to reduce future maintenance burden. The 
PR description mentions this as a benefit of the refactor — actually delivering 
on it would make the code even cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to