hanahmily commented on code in PR #970:
URL:
https://github.com/apache/skywalking-banyandb/pull/970#discussion_r2793211926
##########
api/proto/banyandb/measure/v1/query.proto:
##########
@@ -138,6 +138,8 @@ message QueryRequest {
bool trace = 13;
// stages is used to specify the stage of the data points in the lifecycle
repeated string stages = 14;
- // rewriteAggTopNResult will rewrite agg result to raw data
+ // rewrite_agg_top_n_result will rewrite agg result to raw data
bool rewrite_agg_top_n_result = 15;
+ // agg_return_partial when true asks data nodes to return aggregation
partials (for reduce at liaison)
+ bool agg_return_partial = 16;
Review Comment:
Move this field to "InternalQueryRequest"
##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -127,29 +153,37 @@ func (g *aggregationPlan[N]) Execute(ec context.Context)
(executor.MIterator, er
if err != nil {
return nil, err
}
+ if g.useReduceMode {
Review Comment:
The "reduce" should support aggregation without grouping.
##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -282,3 +349,114 @@ func (ami *aggAllIterator[N]) Current()
[]*measurev1.InternalDataPoint {
func (ami *aggAllIterator[N]) Close() error {
return ami.prev.Close()
}
+
+type reduceGroupIterator[N aggregation.Number] struct {
+ prev executor.MIterator
+ aggregationFieldRef *logical.FieldRef
+ reduceFunc aggregation.Reduce[N]
+ err error
+ groupMap map[uint64][]*measurev1.InternalDataPoint
+ groupByTagsRefs [][]*logical.TagRef
+ groupKeys []uint64
+ aggrType modelv1.AggregationFunction
+ index int
+}
+
+func newReduceGroupIterator[N aggregation.Number](
+ prev executor.MIterator,
+ aggregationFieldRef *logical.FieldRef,
+ reduceFunc aggregation.Reduce[N],
+ aggrType modelv1.AggregationFunction,
+ groupByTagsRefs [][]*logical.TagRef,
+) executor.MIterator {
+ return &reduceGroupIterator[N]{
+ prev: prev,
+ aggregationFieldRef: aggregationFieldRef,
+ reduceFunc: reduceFunc,
+ aggrType: aggrType,
+ groupByTagsRefs: groupByTagsRefs,
+ groupMap:
make(map[uint64][]*measurev1.InternalDataPoint),
+ }
+}
+
+func (rgi *reduceGroupIterator[N]) loadGroups() bool {
+ if rgi.groupKeys != nil {
+ return true
+ }
+ for rgi.prev.Next() {
+ group := rgi.prev.Current()
+ for _, idp := range group {
+ key, keyErr := formatGroupByKey(idp.GetDataPoint(),
rgi.groupByTagsRefs)
Review Comment:
The data is grouped by the measure_plan_groupby.go. Refer to
aggGroupIterator to implement a reduce version.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]