BiteTheDDDDt commented on code in PR #61495:
URL: https://github.com/apache/doris/pull/61495#discussion_r3033701211
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java:
##########
@@ -165,6 +183,285 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr
expr, Map<String, Strin
aggregate.getLogicalProperties(), localAgg));
}
+ /**
+ * Implements bucketed hash aggregation for single-BE deployments.
+ * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate
operator,
+ * eliminating exchange overhead and serialization/deserialization costs.
+ *
+ * Only generated when:
+ * 1. enable_bucketed_hash_agg session variable is true
+ * 2. Cluster has exactly one alive BE
+ * 3. Aggregate has GROUP BY keys (no without-key aggregation)
+ * 4. Aggregate functions support two-phase execution
+ * 5. Data volume checks pass (min input rows, max group keys)
+ */
+ private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan>
aggregate, ConnectContext ctx) {
+ if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+ return ImmutableList.of();
+ }
+ // Only for single-BE deployments
+ int beNumber = ctx.getEnv().getClusterInfo().getBackendsNumber(true);
+ if (beNumber != 1) {
+ return ImmutableList.of();
+ }
+ // Without-key aggregation not supported in initial version
+ if (aggregate.getGroupByExpressions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ // Must support two-phase execution (same check as splitTwoPhase)
+ if (!aggregate.supportAggregatePhase(AggregatePhase.TWO)) {
+ return ImmutableList.of();
+ }
+ // Skip aggregates with no aggregate functions (pure GROUP BY dedup).
+ // These are produced by DistinctAggregateRewriter as the bottom dedup
phase.
+ if (aggregate.getAggregateFunctions().isEmpty()) {
+ return ImmutableList.of();
+ }
+ // Skip aggregates containing multi-distinct functions (e.g.,
multi_distinct_count,
+ // multi_distinct_sum). These are semantically distinct aggregations
rewritten by
+ // DistinctAggregateRewriter — they embed deduplication in the
BE-level function.
+ // The bucketed agg cost model does not account for deduplication
overhead, which
+ // causes the base-table bucketed path to appear artificially cheap
compared to
+ // materialized views using pre-aggregated bitmap_union/hll_union.
+ for (AggregateFunction func : aggregate.getAggregateFunctions()) {
+ if (func instanceof MultiDistinction) {
+ return ImmutableList.of();
+ }
+ }
+ // Skip aggregates whose child group contains a LogicalAggregate.
+ // This detects the top aggregate in a DISTINCT decomposition (e.g.,
+ // COUNT(DISTINCT a) GROUP BY b is rewritten to COUNT(a) GROUP BY b
+ // on top of GROUP BY a,b dedup). Bucketed agg does not support
+ // DISTINCT aggregation in the initial version.
+ if (childGroupContainsAggregate(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Skip when sortByGroupKey optimization applies. This is detected by
+ // checking if the aggregate's owner group has a LogicalTopN parent
+ // whose order key expressions equal the group-by keys (produced by
+ // LimitAggToTopNAgg rewrite). PhysicalBucketedHashAggregate does not
+ // support sortByGroupKey, so we yield to the regular hash-agg plan.
+ if (hasSortByGroupKeyTopN(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Skip when data is already distributed by the GROUP BY keys
+ // (e.g., table bucketed by UserID, query GROUP BY UserID).
+ // In this case the two-phase plan needs no exchange and is strictly
+ // better than bucketed agg (no 256-bucket overhead, no merge phase).
+ if (groupByKeysSatisfyDistribution(aggregate)) {
+ return ImmutableList.of();
+ }
+ // Data-volume-based checks: control bucketed agg eligibility based on
+ // estimated data scale, similar to ClickHouse's
group_by_two_level_threshold
+ // and group_by_two_level_threshold_bytes. This reduces reliance on
+ // column-level statistics which may be inaccurate or missing.
+ //
+ // When statistics are unavailable (groupExpression absent or
childStats null),
+ // conservatively skip bucketed agg — without data volume information
we cannot
+ // make an informed decision, and the risk of choosing bucketed agg in
a
+ // high-cardinality scenario outweighs the potential benefit.
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return ImmutableList.of();
+ }
+ GroupExpression ge = aggregate.getGroupExpression().get();
+ Statistics childStats = ge.childStatistics(0);
+ if (childStats == null) {
+ return ImmutableList.of();
+ }
+ double rows = childStats.getRowCount();
+ long minInputRows = ctx.getSessionVariable().bucketedAggMinInputRows;
+ long maxGroupKeys = ctx.getSessionVariable().bucketedAggMaxGroupKeys;
+
+ // Gate: minimum input rows.
+ // When input data is too small, the overhead of initializing 256
+ // per-bucket hash tables and the pipelined merge phase outweighs
+ // the benefit of eliminating exchange. Skip bucketed agg.
+ if (minInputRows > 0 && rows < minInputRows) {
+ return ImmutableList.of();
+ }
+
+ // Gate: maximum estimated group keys (similar to ClickHouse's
+ // group_by_two_level_threshold). When the number of distinct groups
+ // is too large, the source-side merge must combine too many keys
+ // across instances, and the merge cost dominates. Skip bucketed agg.
+ Statistics aggStats = ge.getOwnerGroup().getStatistics();
+ if (maxGroupKeys > 0 && aggStats != null && aggStats.getRowCount() >
maxGroupKeys) {
+ return ImmutableList.of();
+ }
+
+ // High-cardinality ratio checks (existing logic).
+ // These complement the absolute thresholds above with relative checks:
+ // 1. Single-column NDV check: if ANY GROUP BY key's NDV > rows *
threshold,
+ // the combined NDV is at least that high.
+ // 2. Aggregation ratio check: if estimated output rows > rows *
threshold,
+ // merge cost dominates.
+ double highCardThreshold = 0.3;
+ for (Expression groupByKey : aggregate.getGroupByExpressions()) {
+ ColumnStatistic colStat =
childStats.findColumnStatistics(groupByKey);
+ if (colStat != null && !colStat.isUnKnown() && colStat.ndv > rows
* highCardThreshold) {
+ return ImmutableList.of();
+ }
+ }
+ if (aggStats != null && aggStats.getRowCount() > rows *
highCardThreshold) {
+ return ImmutableList.of();
+ }
+ // Build output expressions: rewrite AggregateFunction ->
AggregateExpression with GLOBAL_RESULT param
+ // (same as one-phase aggregation — raw input directly produces final
result).
+ List<NamedExpression> aggOutput =
ExpressionUtils.rewriteDownShortCircuit(
+ aggregate.getOutputExpressions(), expr -> {
+ if (!(expr instanceof AggregateFunction)) {
+ return expr;
+ }
+ return new AggregateExpression((AggregateFunction) expr,
AggregateParam.GLOBAL_RESULT);
+ }
+ );
+ return ImmutableList.of(new PhysicalBucketedHashAggregate<>(
+ aggregate.getGroupByExpressions(), aggOutput,
+ aggregate.getLogicalProperties(), aggregate.child()));
+ }
+
+ /**
+ * Check if the child group of this aggregate contains a LogicalAggregate.
+ * This is used to detect aggregates produced by DISTINCT decomposition
rewrites
+ * (e.g., DistinctAggregateRewriter, SplitMultiDistinctStrategy), where
the original
+ * DISTINCT aggregate is split into a top non-distinct aggregate over a
bottom dedup aggregate.
+ */
+ private boolean childGroupContainsAggregate(LogicalAggregate<? extends
Plan> aggregate) {
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return false;
+ }
+ GroupExpression groupExpr = aggregate.getGroupExpression().get();
+ if (groupExpr.arity() == 0) {
+ return false;
+ }
+ Group childGroup = groupExpr.child(0);
+ for (GroupExpression childGroupExpr :
childGroup.getLogicalExpressions()) {
+ if (childGroupExpr.getPlan() instanceof LogicalAggregate) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Check if a LogicalTopN parent exists whose order keys are identical to
+ * the aggregate's group-by keys. This means PushTopnToAgg will later set
+ * sortByGroupKey on PhysicalHashAggregate; bucketed agg doesn't support
+ * that optimization so we skip it.
+ *
+ * Handles both TopN->Agg and TopN->Project->Agg patterns.
+ */
+ private boolean hasSortByGroupKeyTopN(LogicalAggregate<? extends Plan>
aggregate) {
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return false;
+ }
+ List<Expression> groupByKeys = aggregate.getGroupByExpressions();
+ Group ownerGroup =
aggregate.getGroupExpression().get().getOwnerGroup();
+ for (GroupExpression parentGE :
ownerGroup.getParentGroupExpressions()) {
+ Plan parentPlan = parentGE.getPlan();
+ if (parentPlan instanceof LogicalTopN
+ && orderKeysMatchGroupKeys((LogicalTopN<?>) parentPlan,
groupByKeys)) {
+ return true;
+ }
+ if (parentPlan instanceof LogicalProject &&
parentGE.getOwnerGroup() != null) {
+ for (GroupExpression gpGE :
parentGE.getOwnerGroup().getParentGroupExpressions()) {
+ if (gpGE.getPlan() instanceof LogicalTopN
+ && orderKeysMatchGroupKeys((LogicalTopN<?>)
gpGE.getPlan(), groupByKeys)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean orderKeysMatchGroupKeys(LogicalTopN<?> topN,
List<Expression> groupByKeys) {
+ List<OrderKey> orderKeys = topN.getOrderKeys();
+ if (orderKeys.size() != groupByKeys.size()) {
+ return false;
+ }
+ for (int i = 0; i < groupByKeys.size(); i++) {
+ if (!groupByKeys.get(i).equals(orderKeys.get(i).getExpr())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Check if the GROUP BY keys of this aggregate are a superset of (or
equal to)
+ * the underlying OlapTable's hash distribution columns. When this is true,
+ * the data is already correctly partitioned for the aggregation, so the
+ * two-phase plan (local + global) requires no exchange and is strictly
better
+ * than bucketed agg (no 256-bucket overhead, no merge phase).
+ *
+ * Traverses the child group in the Memo to find a LogicalOlapScan,
+ * walking through LogicalProject and LogicalFilter transparently.
+ */
+ private boolean groupByKeysSatisfyDistribution(LogicalAggregate<? extends
Plan> aggregate) {
+ if (!aggregate.getGroupExpression().isPresent()) {
+ return false;
+ }
+ GroupExpression groupExpr = aggregate.getGroupExpression().get();
+ if (groupExpr.arity() == 0) {
+ return false;
+ }
+ OlapTable table = findOlapTableInGroup(groupExpr.child(0), 5);
Review Comment:
现状:
- DistributionSpecAny 是在每个 LogicalXxxToPhysicalXxx 转换规则里硬编码的
- 只有 LogicalOlapScanToPhysicalOlapScan 会通过
table.getDefaultDistributionInfo() 查询元数据
- 外部表 (FileScan, HudiScan, EsScan) 全部硬编码 DistributionSpecAny
- JdbcScan, OdbcScan, SchemaScan 连 distributionSpec 字段都没有
- 没有 PhysicalCatalogRelation.getDistributionSpec() 这样的抽象方法
本质问题: PhysicalOlapScan 和 PhysicalFileScan 各自有 getDistributionSpec()
方法,但这不是从基类继承的 — 是各自独立定义的。代码库里需要判断分布时都是用 instanceof 检查的。
对于我们 bucketed agg 的代码: 当前 instanceof LogicalOlapScan
的做法和代码库其他地方一致。理想的改进是在基类加抽象方法,但那是个更大的重构
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -223,6 +224,14 @@ public PhysicalProperties visitPhysicalHashAggregate(
}
}
+ @Override
+ public PhysicalProperties visitPhysicalBucketedHashAggregate(
+ PhysicalBucketedHashAggregate<? extends Plan> agg, PlanContext
context) {
+ Preconditions.checkState(childrenOutputProperties.size() == 1);
+ PhysicalProperties childOutputProperty =
childrenOutputProperties.get(0);
+ return new
PhysicalProperties(childOutputProperty.getDistributionSpec());
Review Comment:
这个地方,虽然bucket
agg返回的是正交的数据,但是因为hash方式跟exchange算子不一样,所以应该不能当作是execution_bucket的分布
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]