Copilot commented on code in PR #61395:
URL: https://github.com/apache/doris/pull/61395#discussion_r2940711064


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java:
##########
@@ -165,6 +167,48 @@ public Void visitSessionVarGuardExpr(SessionVarGuardExpr 
expr, Map<String, Strin
                 aggregate.getLogicalProperties(), localAgg));
     }
 
+    /**
+     * Implements bucketed hash aggregation for single-BE deployments.
+     * Fuses two-phase aggregation into a single PhysicalBucketedHashAggregate 
operator,
+     * eliminating exchange overhead and serialization/deserialization costs.
+     *
+     * Only generated when:
+     * 1. enable_bucketed_hash_agg session variable is true
+     * 2. Cluster has exactly one alive BE
+     * 3. Aggregate has GROUP BY keys (no without-key aggregation)
+     * 4. Aggregate functions support two-phase execution
+     */
+    private List<Plan> implementBucketedPhase(LogicalAggregate<? extends Plan> 
aggregate, ConnectContext ctx) {
+        if (!ctx.getSessionVariable().enableBucketedHashAgg) {
+            return ImmutableList.of();
+        }
+        // Only for single-BE deployments
+        int beNumber = Math.max(1, 
ctx.getEnv().getClusterInfo().getBackendsNumber(true));

Review Comment:
   Using `Math.max(1, ...)` makes `beNumber` become `1` even when there are `0` 
alive BEs, which can incorrectly generate a bucketed-agg candidate in an 
invalid cluster state. Consider removing the `Math.max(1, ...)` and returning 
`ImmutableList.of()` when the actual alive-BE count is not exactly `1` 
(including `0`).
   



##########
be/src/exec/pipeline/dependency.h:
##########
@@ -419,11 +419,166 @@ struct AggSharedState : public BasicSharedState {
     void _destroy_agg_status(AggregateDataPtr data);
 };
 
+static constexpr int BUCKETED_AGG_NUM_BUCKETS = 256;
+
+/// Shared state for BucketedAggSinkOperatorX / BucketedAggSourceOperatorX.
+///
+/// Each sink pipeline instance owns 256 per-bucket hash tables (two-level 
hash table
+/// approach, inspired by ClickHouse). During sink, each row is routed to 
bucket
+/// (hash >> 24) & 0xFF.
+///
+/// Source-side merge is pipelined with sink completion: as each sink instance 
finishes,
+/// it unblocks all source dependencies. Source instances scan buckets and 
merge data
+/// from finished sink instances into the merge target (the first sink to 
finish).
+/// Each bucket has a CAS lock so only one source works on a bucket at a time.
+/// After all sinks finish and all buckets are merged + output, one source 
handles
+/// null key merge and the pipeline completes.
+///
+/// Thread safety model:
+///  - Sink phase: each instance writes only to its own 
per_instance_data[task_idx]. No locking.
+///  - Source phase: per-bucket CAS lock (merge_in_progress). Under the lock, 
a source
+///    scans all finished sink instances and merges their bucket data into the 
merge
+///    target's bucket. Already-merged entries are nulled out to prevent 
re-processing.
+///    Output is only done when all sinks have finished and the bucket is 
fully merged.
+struct BucketedAggSharedState : public BasicSharedState {
+    ENABLE_FACTORY_CREATOR(BucketedAggSharedState)
+public:
+    BucketedAggSharedState() = default;
+    ~BucketedAggSharedState() override { _close(); }
+
+    /// Per-instance data. One per sink pipeline instance.
+    /// Each instance has 256 bucket hash tables + 1 shared arena.
+    struct PerInstanceData {
+        /// 256 per-bucket hash tables. Each bucket has its own 
AggregatedDataVariants.
+        std::vector<AggregatedDataVariantsUPtr> bucket_agg_data;
+        ArenaUPtr arena;
+
+        PerInstanceData() : arena(std::make_unique<Arena>()) {
+            bucket_agg_data.resize(BUCKETED_AGG_NUM_BUCKETS);
+            for (auto& p : bucket_agg_data) {
+                p = std::make_unique<AggregatedDataVariants>();
+            }
+        }
+    };
+
+    /// Per-bucket merge state for pipelined source-side processing.
+    struct BucketMergeState {
+        /// CAS lock: only one source instance can merge/output this bucket at 
a time.
+        std::atomic<bool> merge_in_progress {false};
+        /// Set to true once the bucket is fully merged and all rows have been 
output.
+        std::atomic<bool> output_done {false};
+        /// Bitmask tracking which sink instances have been merged into the 
merge target
+        /// for this bucket. Accessed only under merge_in_progress CAS lock.
+        /// Bit i is set when instance i's data for this bucket has been 
merged.
+        /// Supports up to 64 instances (pipeline parallelism rarely exceeds 
this).
+        uint64_t merged_instances_mask = 0;

Review Comment:
   `merged_instances_mask` hard-limits merge tracking to 64 sink instances; if 
`_num_instances` can exceed 64, `1ULL << inst_idx` will overflow/UB and the 
merge correctness breaks. Please add an explicit guard/fallback (e.g., reject 
bucketed agg when instances > 64, or replace the mask with a dynamically-sized 
bitmap/vector) so correctness does not depend on an assumed parallelism cap.
   



##########
fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.thrift.TBucketedAggregationNode;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPlanNodeType;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Bucketed hash aggregation node.
+ *
+ * Fuses two-phase aggregation (local + global) into a single BE operator for 
single-BE deployments.
+ * Produces a BUCKETED_AGGREGATION_NODE in the Thrift plan, which the BE maps 
to
+ * BucketedAggSinkOperatorX / BucketedAggSourceOperatorX.
+ */
+public class BucketedAggregationNode extends PlanNode {
+    private final AggregateInfo aggInfo;
+    private final boolean needsFinalize;
+
+    public BucketedAggregationNode(PlanNodeId id, PlanNode input, 
AggregateInfo aggInfo,
+            boolean needsFinalize) {
+        super(id, aggInfo.getOutputTupleId().asList(), "BUCKETED AGGREGATE");
+        this.aggInfo = aggInfo;
+        this.needsFinalize = needsFinalize;
+        this.children.add(input);
+    }
+
+    @Override
+    protected void toThrift(TPlanNode msg) {
+        msg.node_type = TPlanNodeType.BUCKETED_AGGREGATION_NODE;
+
+        List<TExpr> aggregateFunctions = Lists.newArrayList();
+        for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) {
+            aggregateFunctions.add(e.treeToThrift());
+        }
+
+        List<TExpr> groupingExprs = Lists.newArrayList();
+        if (aggInfo.getGroupingExprs() != null) {
+            groupingExprs = Expr.treesToThrift(aggInfo.getGroupingExprs());
+        }
+
+        TBucketedAggregationNode bucketedAggNode = new 
TBucketedAggregationNode();
+        bucketedAggNode.setGroupingExprs(groupingExprs);
+        bucketedAggNode.setAggregateFunctions(aggregateFunctions);
+        
bucketedAggNode.setIntermediateTupleId(aggInfo.getOutputTupleId().asInt());

Review Comment:
   The Thrift payload distinguishes `intermediate_tuple_id` vs 
`output_tuple_id`, but the FE currently sets both to the same tuple. For many 
aggregate functions, intermediate (state/serialized) slots can differ from 
final output slots; using the output tuple for intermediate can lead to 
incorrect evaluator preparation and incorrect results at execution time. 
Recommend generating/using a proper intermediate tuple descriptor/id (with 
intermediate slot types) and setting `intermediate_tuple_id` accordingly, while 
keeping `output_tuple_id` as the final output tuple.
   



##########
be/src/exec/operator/bucketed_aggregation_source_operator.cpp:
##########
@@ -0,0 +1,856 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/operator/bucketed_aggregation_source_operator.h"
+
+#include <memory>
+#include <string>
+
+#include "common/exception.h"
+#include "exec/common/hash_table/hash.h"
+#include "exec/common/util.hpp"
+#include "exec/operator/operator.h"
+#include "exprs/vectorized_agg_fn.h"
+#include "runtime/runtime_profile.h"
+#include "runtime/thread_context.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+
+// Helper to set/get null key data on hash tables that support it 
(DataWithNullKey).
+// For hash tables without nullable key support (PHHashMap, StringHashMap), 
these are no-ops.
+// This is needed because in nested std::visit lambdas, the outer hash table 
type is already
+// resolved and doesn't depend on the inner template parameter, so `if 
constexpr` inside the
+// inner lambda cannot suppress compilation of code that accesses 
has_null_key_data() on the
+// outer (non-dependent) type.
+template <typename HashTable>
+constexpr bool has_nullable_key_v =
+        
std::is_assignable_v<decltype(std::declval<HashTable&>().has_null_key_data()), 
bool>;
+
+template <typename HashTable>
+void set_null_key_flag(HashTable& ht, bool val) {
+    if constexpr (has_nullable_key_v<HashTable>) {
+        ht.has_null_key_data() = val;
+    }
+}
+
+template <typename HashTable>
+bool get_null_key_flag(const HashTable& ht) {
+    return ht.has_null_key_data();
+}
+
+template <typename HashTable>
+AggregateDataPtr get_null_key_agg_data(HashTable& ht) {
+    if constexpr (has_nullable_key_v<HashTable>) {
+        return ht.template get_null_key_data<AggregateDataPtr>();
+    } else {
+        return nullptr;
+    }
+}
+
+template <typename HashTable>
+void set_null_key_agg_data(HashTable& ht, AggregateDataPtr val) {
+    if constexpr (has_nullable_key_v<HashTable>) {
+        ht.template get_null_key_data<AggregateDataPtr>() = val;
+    }
+}
+
+// Helper for emplace that works with both PHHashMap (3-arg) and 
StringHashTable (4-arg).
+// Uses SFINAE to detect which interface is available.
+template <typename HashTable, typename Key>
+auto hash_table_emplace(HashTable& ht, const Key& key, typename 
HashTable::LookupResult& it,
+                        bool& inserted) -> decltype(ht.emplace(key, it, 
inserted), void()) {
+    ht.emplace(key, it, inserted);
+}
+
+template <typename HashTable, typename Key, typename... Dummy>
+void hash_table_emplace(HashTable& ht, const Key& key, typename 
HashTable::LookupResult& it,
+                        bool& inserted, Dummy...) {
+    // StringHashTable requires a hash value. Compute it.
+    size_t hash_value = StringHashTableHash()(key);
+    ht.emplace(key, it, inserted, hash_value);
+}

Review Comment:
   The fallback overload is not constrained, and (because `Dummy...` can be 
empty) it can become viable alongside the SFINAE’d overload, risking ambiguous 
overload resolution or picking the wrong path for non-string hash tables. 
Please constrain the fallback overload to only participate when 
`ht.emplace(key, it, inserted)` is not available (e.g., via 
`requires`/`enable_if`), ensuring the intended interface selection is 
unambiguous and robust.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to