This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new ab3de192 Push down sum agg to data node (#924)
ab3de192 is described below

commit ab3de19288c77701a38e46267ee98509e9d386c2
Author: OmCheeLin <[email protected]>
AuthorDate: Thu Jan 8 09:46:37 2026 +0800

    Push down sum agg to data node (#924)
    
    
    
    Co-authored-by: Gao Hongtao <[email protected]>
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 pkg/query/logical/measure/measure_analyzer.go      |  3 +-
 .../logical/measure/measure_plan_distributed.go    | 44 ++++++++++++++++-
 test/cases/measure/data/input/group_sum.ql         | 22 +++++++++
 test/cases/measure/data/input/group_sum.yaml       | 35 ++++++++++++++
 test/cases/measure/data/want/group_sum.yaml        | 55 ++++++++++++++++++++++
 test/cases/measure/measure.go                      |  1 +
 6 files changed, 158 insertions(+), 2 deletions(-)

diff --git a/pkg/query/logical/measure/measure_analyzer.go 
b/pkg/query/logical/measure/measure_analyzer.go
index 8932006c..85dcd7de 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -160,7 +160,8 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest, 
ss []logical.Schema) (
        // TODO: to support all aggregation functions
        needCompletePushDownAgg := criteria.GetAgg() != nil &&
                (criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX ||
-                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN) &&
+                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN ||
+                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM) &&
                criteria.GetTop() == nil
 
        // parse fields
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index bd3ebfcc..6f774c67 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -160,6 +160,17 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                temp.Top = ud.originalQuery.Top
                temp.GroupBy = ud.originalQuery.GroupBy
        }
+       // Prepare groupBy tags refs if needed for deduplication
+       var groupByTagsRefs [][]*logical.TagRef
+       if ud.needCompletePushDownAgg && ud.originalQuery.GetGroupBy() != nil {
+               groupByTags := 
logical.ToTags(ud.originalQuery.GetGroupBy().GetTagProjection())
+               var err error
+               groupByTagsRefs, err = s.CreateTagRef(groupByTags...)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
        if ud.groupByEntity {
                e := s.EntityList()[0]
                sortTagSpec := s.FindTagSpecByName(e)
@@ -172,6 +183,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                        sortByTime:              false,
                        sortTagSpec:             *sortTagSpec,
                        needCompletePushDownAgg: ud.needCompletePushDownAgg,
+                       groupByTagsRefs:         groupByTagsRefs,
                }
                if ud.originalQuery.OrderBy != nil && 
ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
                        result.desc = true
@@ -184,6 +196,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                        s:                       s,
                        sortByTime:              true,
                        needCompletePushDownAgg: ud.needCompletePushDownAgg,
+                       groupByTagsRefs:         groupByTagsRefs,
                }, nil
        }
        if ud.originalQuery.OrderBy.IndexRuleName == "" {
@@ -192,6 +205,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                        s:                       s,
                        sortByTime:              true,
                        needCompletePushDownAgg: ud.needCompletePushDownAgg,
+                       groupByTagsRefs:         groupByTagsRefs,
                }
                if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
                        result.desc = true
@@ -215,6 +229,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                sortByTime:              false,
                sortTagSpec:             *sortTagSpec,
                needCompletePushDownAgg: ud.needCompletePushDownAgg,
+               groupByTagsRefs:         groupByTagsRefs,
        }
        if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
                result.desc = true
@@ -226,6 +241,7 @@ type distributedPlan struct {
        s                       logical.Schema
        queryTemplate           *measurev1.QueryRequest
        sortTagSpec             logical.TagSpec
+       groupByTagsRefs         [][]*logical.TagRef
        maxDataPointsSize       uint32
        sortByTime              bool
        desc                    bool
@@ -293,7 +309,11 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi 
executor.MIterator, e
                span.Tagf("data_point_count", "%d", dataPointCount)
        }
        if t.needCompletePushDownAgg {
-               return &pushedDownAggregatedIterator{dataPoints: 
pushedDownAggDps}, err
+               deduplicatedDps, dedupErr := 
deduplicateAggregatedDataPoints(pushedDownAggDps, t.groupByTagsRefs)
+               if dedupErr != nil {
+                       return nil, multierr.Append(err, dedupErr)
+               }
+               return &pushedDownAggregatedIterator{dataPoints: 
deduplicatedDps}, err
        }
        smi := &sortedMIterator{
                Iterator: sort.NewItemIter(see, t.desc),
@@ -514,3 +534,25 @@ func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.DataPoint {
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
+// by keeping only one data point per group. Since replicas hold identical 
data, aggregates
+// for the same group are identical across replicas.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.DataPoint, error) {
+       if len(groupByTagsRefs) == 0 {
+               return dataPoints, nil
+       }
+       groupMap := make(map[uint64]struct{})
+       result := make([]*measurev1.DataPoint, 0, len(dataPoints))
+       for _, dp := range dataPoints {
+               key, err := formatGroupByKey(dp, groupByTagsRefs)
+               if err != nil {
+                       return nil, err
+               }
+               if _, exists := groupMap[key]; !exists {
+                       groupMap[key] = struct{}{}
+                       result = append(result, dp)
+               }
+       }
+       return result, nil
+}
diff --git a/test/cases/measure/data/input/group_sum.ql 
b/test/cases/measure/data/input/group_sum.ql
new file mode 100644
index 00000000..48bd12bf
--- /dev/null
+++ b/test/cases/measure/data/input/group_sum.ql
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT id, total::field, value::field, SUM(value) FROM MEASURE 
service_cpm_minute IN sw_metric
+TIME > '-15m'
+GROUP BY id, value
+
diff --git a/test/cases/measure/data/input/group_sum.yaml 
b/test/cases/measure/data/input/group_sum.yaml
new file mode 100644
index 00000000..1efc3e52
--- /dev/null
+++ b/test/cases/measure/data/input/group_sum.yaml
@@ -0,0 +1,35 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_cpm_minute"
+groups: ["sw_metric"]
+tagProjection:
+  tagFamilies:
+  - name: "default"
+    tags: ["id"]
+fieldProjection:
+  names: ["total", "value"]
+groupBy:
+  tagProjection:
+    tagFamilies:
+    - name: "default"
+      tags: ["id"]
+  fieldName: "value"
+agg:
+  function: "AGGREGATION_FUNCTION_SUM"
+  fieldName: "value"
+
diff --git a/test/cases/measure/data/want/group_sum.yaml 
b/test/cases/measure/data/want/group_sum.yaml
new file mode 100644
index 00000000..3d8262e4
--- /dev/null
+++ b/test/cases/measure/data/want/group_sum.yaml
@@ -0,0 +1,55 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+  - name: value
+    value:
+      int:
+        value: "6"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: svc1
+- fields:
+  - name: value
+    value:
+      int:
+        value: "9"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: svc2
+- fields:
+  - name: value
+    value:
+      int:
+        value: "6"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: svc3
+
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index c680912d..f7526e2c 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -50,6 +50,7 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("filter by an unknown tag", helpers.Args{Input: 
"tag_filter_unknown", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
WantEmpty: true}),
        g.Entry("group and max", helpers.Args{Input: "group_max", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
        g.Entry("group and min", helpers.Args{Input: "group_min", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("group and sum", helpers.Args{Input: "group_sum", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
        g.Entry("group without field", helpers.Args{Input: "group_no_field", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("top 2 by id", helpers.Args{Input: "top", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
        g.Entry("bottom 2 by id", helpers.Args{Input: "bottom", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),

Reply via email to