Copilot commented on code in PR #970:
URL: 
https://github.com/apache/skywalking-banyandb/pull/970#discussion_r2793129562


##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -282,3 +349,114 @@ func (ami *aggAllIterator[N]) Current() 
[]*measurev1.InternalDataPoint {
 func (ami *aggAllIterator[N]) Close() error {
        return ami.prev.Close()
 }
+
+type reduceGroupIterator[N aggregation.Number] struct {
+       prev                executor.MIterator
+       aggregationFieldRef *logical.FieldRef
+       reduceFunc          aggregation.Reduce[N]
+       err                 error
+       groupMap            map[uint64][]*measurev1.InternalDataPoint
+       groupByTagsRefs     [][]*logical.TagRef
+       groupKeys           []uint64
+       aggrType            modelv1.AggregationFunction
+       index               int
+}
+
+func newReduceGroupIterator[N aggregation.Number](
+       prev executor.MIterator,
+       aggregationFieldRef *logical.FieldRef,
+       reduceFunc aggregation.Reduce[N],
+       aggrType modelv1.AggregationFunction,
+       groupByTagsRefs [][]*logical.TagRef,
+) executor.MIterator {
+       return &reduceGroupIterator[N]{
+               prev:                prev,
+               aggregationFieldRef: aggregationFieldRef,
+               reduceFunc:          reduceFunc,
+               aggrType:            aggrType,
+               groupByTagsRefs:     groupByTagsRefs,
+               groupMap:            
make(map[uint64][]*measurev1.InternalDataPoint),
+       }
+}
+
+func (rgi *reduceGroupIterator[N]) loadGroups() bool {
+       if rgi.groupKeys != nil {
+               return true
+       }
+       for rgi.prev.Next() {
+               group := rgi.prev.Current()
+               for _, idp := range group {
+                       key, keyErr := formatGroupByKey(idp.GetDataPoint(), 
rgi.groupByTagsRefs)
+                       if keyErr != nil {
+                               rgi.err = keyErr
+                               return false
+                       }
+                       rgi.groupMap[key] = append(rgi.groupMap[key], idp)
+               }
+       }
+       if closeErr := rgi.prev.Close(); closeErr != nil {
+               rgi.err = closeErr
+               return false
+       }

Review Comment:
   reduceGroupIterator.loadGroups closes rgi.prev, but 
reduceGroupIterator.Close() also calls rgi.prev.Close() again. If the 
underlying iterator Close is not idempotent, this can cause double-close errors 
or unintended side effects. Consider removing the close from loadGroups (and 
only closing in Close), or track a closed flag and make Close a no-op after the 
early close.
   ```suggestion
   
   ```



##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -189,16 +223,30 @@ func (ami *aggGroupIterator[N]) Current() 
[]*measurev1.InternalDataPoint {
        if resultDp == nil {
                return nil
        }
-       val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
-       if err != nil {
-               ami.err = err
-               return nil
-       }
-       resultDp.Fields = []*measurev1.DataPoint_Field{
-               {
-                       Name:  ami.aggregationFieldRef.Field.Name,
-                       Value: val,
-               },
+       if ami.emitPartial {
+               part := ami.mapFunc.Partial()
+               fvs, err := aggregation.PartialToFieldValues(ami.aggrType, part)
+               if err != nil {
+                       ami.err = err
+                       return nil
+               }
+               resultDp.Fields = make([]*measurev1.DataPoint_Field, len(fvs))
+               for i, fv := range fvs {
+                       name := ami.aggregationFieldRef.Field.Name
+                       if i > 0 {
+                               name = "__agg_count"
+                       }
+                       resultDp.Fields[i] = &measurev1.DataPoint_Field{Name: 
name, Value: fv}
+               }

Review Comment:
   Partial aggregation encodes the extra MEAN count as a synthetic field named 
"__agg_count". This introduces an undocumented reserved field name and 
duplicates a magic string in multiple places, which can collide with real field 
names and is hard to maintain. Consider defining a constant for this reserved 
name and documenting/enforcing that it cannot conflict with user fields (or 
carry partials out-of-band instead of as normal fields).



##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -282,3 +349,114 @@ func (ami *aggAllIterator[N]) Current() 
[]*measurev1.InternalDataPoint {
 func (ami *aggAllIterator[N]) Close() error {
        return ami.prev.Close()
 }
+
+type reduceGroupIterator[N aggregation.Number] struct {
+       prev                executor.MIterator
+       aggregationFieldRef *logical.FieldRef
+       reduceFunc          aggregation.Reduce[N]
+       err                 error
+       groupMap            map[uint64][]*measurev1.InternalDataPoint
+       groupByTagsRefs     [][]*logical.TagRef
+       groupKeys           []uint64
+       aggrType            modelv1.AggregationFunction
+       index               int
+}
+
+func newReduceGroupIterator[N aggregation.Number](
+       prev executor.MIterator,
+       aggregationFieldRef *logical.FieldRef,
+       reduceFunc aggregation.Reduce[N],
+       aggrType modelv1.AggregationFunction,
+       groupByTagsRefs [][]*logical.TagRef,
+) executor.MIterator {
+       return &reduceGroupIterator[N]{
+               prev:                prev,
+               aggregationFieldRef: aggregationFieldRef,
+               reduceFunc:          reduceFunc,
+               aggrType:            aggrType,
+               groupByTagsRefs:     groupByTagsRefs,
+               groupMap:            
make(map[uint64][]*measurev1.InternalDataPoint),
+       }
+}
+
+func (rgi *reduceGroupIterator[N]) loadGroups() bool {
+       if rgi.groupKeys != nil {
+               return true
+       }
+       for rgi.prev.Next() {
+               group := rgi.prev.Current()
+               for _, idp := range group {
+                       key, keyErr := formatGroupByKey(idp.GetDataPoint(), 
rgi.groupByTagsRefs)
+                       if keyErr != nil {
+                               rgi.err = keyErr
+                               return false
+                       }
+                       rgi.groupMap[key] = append(rgi.groupMap[key], idp)
+               }
+       }
+       if closeErr := rgi.prev.Close(); closeErr != nil {
+               rgi.err = closeErr
+               return false
+       }
+       rgi.groupKeys = make([]uint64, 0, len(rgi.groupMap))
+       for k := range rgi.groupMap {
+               rgi.groupKeys = append(rgi.groupKeys, k)
+       }

Review Comment:
   reduceGroupIterator builds rgi.groupKeys by ranging over a map, which yields 
a nondeterministic iteration order. This can make aggregation result ordering 
unstable and potentially flaky across runs. Consider preserving first-seen 
order (like groupBy.hash does via groupLst) or explicitly sorting groupKeys 
before iteration.



##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -282,3 +349,114 @@ func (ami *aggAllIterator[N]) Current() 
[]*measurev1.InternalDataPoint {
 func (ami *aggAllIterator[N]) Close() error {
        return ami.prev.Close()
 }
+
+type reduceGroupIterator[N aggregation.Number] struct {
+       prev                executor.MIterator
+       aggregationFieldRef *logical.FieldRef
+       reduceFunc          aggregation.Reduce[N]
+       err                 error
+       groupMap            map[uint64][]*measurev1.InternalDataPoint
+       groupByTagsRefs     [][]*logical.TagRef
+       groupKeys           []uint64
+       aggrType            modelv1.AggregationFunction
+       index               int
+}
+
+func newReduceGroupIterator[N aggregation.Number](
+       prev executor.MIterator,
+       aggregationFieldRef *logical.FieldRef,
+       reduceFunc aggregation.Reduce[N],
+       aggrType modelv1.AggregationFunction,
+       groupByTagsRefs [][]*logical.TagRef,
+) executor.MIterator {
+       return &reduceGroupIterator[N]{
+               prev:                prev,
+               aggregationFieldRef: aggregationFieldRef,
+               reduceFunc:          reduceFunc,
+               aggrType:            aggrType,
+               groupByTagsRefs:     groupByTagsRefs,
+               groupMap:            
make(map[uint64][]*measurev1.InternalDataPoint),
+       }
+}
+
+func (rgi *reduceGroupIterator[N]) loadGroups() bool {
+       if rgi.groupKeys != nil {
+               return true
+       }
+       for rgi.prev.Next() {
+               group := rgi.prev.Current()
+               for _, idp := range group {
+                       key, keyErr := formatGroupByKey(idp.GetDataPoint(), 
rgi.groupByTagsRefs)
+                       if keyErr != nil {
+                               rgi.err = keyErr
+                               return false
+                       }
+                       rgi.groupMap[key] = append(rgi.groupMap[key], idp)
+               }

Review Comment:
   reduceGroupIterator.loadGroups buffers all incoming datapoints into 
rgi.groupMap before returning any results. This prevents streaming and can 
cause high memory usage for large aggregations (even if results are partially 
aggregated). Consider reducing per-group as data arrives (streaming combine) or 
at least bounding memory usage.



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -310,7 +311,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi 
executor.MIterator, e
                span.Tagf("response_count", "%d", responseCount)
                span.Tagf("data_point_count", "%d", dataPointCount)
        }
-       if t.needCompletePushDownAgg {
+       if t.pushDownAgg {
                deduplicatedDps, dedupErr := 
deduplicateAggregatedDataPointsWithShard(pushedDownAggDps, t.groupByTagsRefs)
                if dedupErr != nil {

Review Comment:
   When pushDownAgg is enabled and there is no GroupBy, t.groupByTagsRefs is 
empty and deduplicateAggregatedDataPointsWithShard currently performs no 
deduplication. In a replicated setup this can cause partials from multiple 
replicas of the same shard to be combined, producing incorrect aggregated 
results (double counting, etc.). Consider deduplicating by shard ID even when 
groupByTagsRefs is empty (e.g., treat groupKey as 0 and still hash with 
shard_id, or add a dedicated path for no-group-by aggregation).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to