hanahmily commented on code in PR #970:
URL: 
https://github.com/apache/skywalking-banyandb/pull/970#discussion_r2793211926


##########
api/proto/banyandb/measure/v1/query.proto:
##########
@@ -138,6 +138,8 @@ message QueryRequest {
   bool trace = 13;
   // stages is used to specify the stage of the data points in the lifecycle
   repeated string stages = 14;
-  // rewriteAggTopNResult will rewrite agg result to raw data
+  // rewrite_agg_top_n_result will rewrite agg result to raw data
   bool rewrite_agg_top_n_result = 15;
+  // agg_return_partial when true asks data nodes to return aggregation 
partials (for reduce at liaison)
+  bool agg_return_partial = 16;

Review Comment:
   Move this field to "InternalQueryRequest"



##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -127,29 +153,37 @@ func (g *aggregationPlan[N]) Execute(ec context.Context) 
(executor.MIterator, er
        if err != nil {
                return nil, err
        }
+       if g.useReduceMode {

Review Comment:
   The "reduce" should support aggregation without grouping. 



##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -282,3 +349,114 @@ func (ami *aggAllIterator[N]) Current() 
[]*measurev1.InternalDataPoint {
 func (ami *aggAllIterator[N]) Close() error {
        return ami.prev.Close()
 }
+
+type reduceGroupIterator[N aggregation.Number] struct {
+       prev                executor.MIterator
+       aggregationFieldRef *logical.FieldRef
+       reduceFunc          aggregation.Reduce[N]
+       err                 error
+       groupMap            map[uint64][]*measurev1.InternalDataPoint
+       groupByTagsRefs     [][]*logical.TagRef
+       groupKeys           []uint64
+       aggrType            modelv1.AggregationFunction
+       index               int
+}
+
+func newReduceGroupIterator[N aggregation.Number](
+       prev executor.MIterator,
+       aggregationFieldRef *logical.FieldRef,
+       reduceFunc aggregation.Reduce[N],
+       aggrType modelv1.AggregationFunction,
+       groupByTagsRefs [][]*logical.TagRef,
+) executor.MIterator {
+       return &reduceGroupIterator[N]{
+               prev:                prev,
+               aggregationFieldRef: aggregationFieldRef,
+               reduceFunc:          reduceFunc,
+               aggrType:            aggrType,
+               groupByTagsRefs:     groupByTagsRefs,
+               groupMap:            
make(map[uint64][]*measurev1.InternalDataPoint),
+       }
+}
+
+func (rgi *reduceGroupIterator[N]) loadGroups() bool {
+       if rgi.groupKeys != nil {
+               return true
+       }
+       for rgi.prev.Next() {
+               group := rgi.prev.Current()
+               for _, idp := range group {
+                       key, keyErr := formatGroupByKey(idp.GetDataPoint(), 
rgi.groupByTagsRefs)

Review Comment:
   The data is grouped by the measure_plan_groupby.go. Refer to 
aggGroupIterator to implement a reduce version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to