This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new ab3de192 Push down sum agg to data node (#924)
ab3de192 is described below
commit ab3de19288c77701a38e46267ee98509e9d386c2
Author: OmCheeLin <[email protected]>
AuthorDate: Thu Jan 8 09:46:37 2026 +0800
Push down sum agg to data node (#924)
Co-authored-by: Gao Hongtao <[email protected]>
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
pkg/query/logical/measure/measure_analyzer.go | 3 +-
.../logical/measure/measure_plan_distributed.go | 44 ++++++++++++++++-
test/cases/measure/data/input/group_sum.ql | 22 +++++++++
test/cases/measure/data/input/group_sum.yaml | 35 ++++++++++++++
test/cases/measure/data/want/group_sum.yaml | 55 ++++++++++++++++++++++
test/cases/measure/measure.go | 1 +
6 files changed, 158 insertions(+), 2 deletions(-)
diff --git a/pkg/query/logical/measure/measure_analyzer.go
b/pkg/query/logical/measure/measure_analyzer.go
index 8932006c..85dcd7de 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -160,7 +160,8 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest,
ss []logical.Schema) (
// TODO: to support all aggregation functions
needCompletePushDownAgg := criteria.GetAgg() != nil &&
(criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX ||
- criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN) &&
+ criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN ||
+ criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM) &&
criteria.GetTop() == nil
// parse fields
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go
b/pkg/query/logical/measure/measure_plan_distributed.go
index bd3ebfcc..6f774c67 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -160,6 +160,17 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
temp.Top = ud.originalQuery.Top
temp.GroupBy = ud.originalQuery.GroupBy
}
+ // Prepare groupBy tags refs if needed for deduplication
+ var groupByTagsRefs [][]*logical.TagRef
+ if ud.needCompletePushDownAgg && ud.originalQuery.GetGroupBy() != nil {
+ groupByTags :=
logical.ToTags(ud.originalQuery.GetGroupBy().GetTagProjection())
+ var err error
+ groupByTagsRefs, err = s.CreateTagRef(groupByTags...)
+ if err != nil {
+ return nil, err
+ }
+ }
+
if ud.groupByEntity {
e := s.EntityList()[0]
sortTagSpec := s.FindTagSpecByName(e)
@@ -172,6 +183,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
sortByTime: false,
sortTagSpec: *sortTagSpec,
needCompletePushDownAgg: ud.needCompletePushDownAgg,
+ groupByTagsRefs: groupByTagsRefs,
}
if ud.originalQuery.OrderBy != nil &&
ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
result.desc = true
@@ -184,6 +196,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
s: s,
sortByTime: true,
needCompletePushDownAgg: ud.needCompletePushDownAgg,
+ groupByTagsRefs: groupByTagsRefs,
}, nil
}
if ud.originalQuery.OrderBy.IndexRuleName == "" {
@@ -192,6 +205,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
s: s,
sortByTime: true,
needCompletePushDownAgg: ud.needCompletePushDownAgg,
+ groupByTagsRefs: groupByTagsRefs,
}
if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
result.desc = true
@@ -215,6 +229,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
sortByTime: false,
sortTagSpec: *sortTagSpec,
needCompletePushDownAgg: ud.needCompletePushDownAgg,
+ groupByTagsRefs: groupByTagsRefs,
}
if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
result.desc = true
@@ -226,6 +241,7 @@ type distributedPlan struct {
s logical.Schema
queryTemplate *measurev1.QueryRequest
sortTagSpec logical.TagSpec
+ groupByTagsRefs [][]*logical.TagRef
maxDataPointsSize uint32
sortByTime bool
desc bool
@@ -293,7 +309,11 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi
executor.MIterator, e
span.Tagf("data_point_count", "%d", dataPointCount)
}
if t.needCompletePushDownAgg {
- return &pushedDownAggregatedIterator{dataPoints:
pushedDownAggDps}, err
+ deduplicatedDps, dedupErr :=
deduplicateAggregatedDataPoints(pushedDownAggDps, t.groupByTagsRefs)
+ if dedupErr != nil {
+ return nil, multierr.Append(err, dedupErr)
+ }
+ return &pushedDownAggregatedIterator{dataPoints:
deduplicatedDps}, err
}
smi := &sortedMIterator{
Iterator: sort.NewItemIter(see, t.desc),
@@ -514,3 +534,25 @@ func (s *pushedDownAggregatedIterator) Current()
[]*measurev1.DataPoint {
func (s *pushedDownAggregatedIterator) Close() error {
return nil
}
+
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from
multiple replicas
+// by keeping only one data point per group. Since replicas hold identical
data, aggregates
+// for the same group are identical across replicas.
+func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint,
groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.DataPoint, error) {
+ if len(groupByTagsRefs) == 0 {
+ return dataPoints, nil
+ }
+ groupMap := make(map[uint64]struct{})
+ result := make([]*measurev1.DataPoint, 0, len(dataPoints))
+ for _, dp := range dataPoints {
+ key, err := formatGroupByKey(dp, groupByTagsRefs)
+ if err != nil {
+ return nil, err
+ }
+ if _, exists := groupMap[key]; !exists {
+ groupMap[key] = struct{}{}
+ result = append(result, dp)
+ }
+ }
+ return result, nil
+}
diff --git a/test/cases/measure/data/input/group_sum.ql
b/test/cases/measure/data/input/group_sum.ql
new file mode 100644
index 00000000..48bd12bf
--- /dev/null
+++ b/test/cases/measure/data/input/group_sum.ql
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT id, total::field, value::field, SUM(value) FROM MEASURE
service_cpm_minute IN sw_metric
+TIME > '-15m'
+GROUP BY id, value
+
diff --git a/test/cases/measure/data/input/group_sum.yaml
b/test/cases/measure/data/input/group_sum.yaml
new file mode 100644
index 00000000..1efc3e52
--- /dev/null
+++ b/test/cases/measure/data/input/group_sum.yaml
@@ -0,0 +1,35 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_cpm_minute"
+groups: ["sw_metric"]
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["id"]
+fieldProjection:
+ names: ["total", "value"]
+groupBy:
+ tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["id"]
+ fieldName: "value"
+agg:
+ function: "AGGREGATION_FUNCTION_SUM"
+ fieldName: "value"
+
diff --git a/test/cases/measure/data/want/group_sum.yaml
b/test/cases/measure/data/want/group_sum.yaml
new file mode 100644
index 00000000..3d8262e4
--- /dev/null
+++ b/test/cases/measure/data/want/group_sum.yaml
@@ -0,0 +1,55 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+ - name: value
+ value:
+ int:
+ value: "6"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: svc1
+- fields:
+ - name: value
+ value:
+ int:
+ value: "9"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: svc2
+- fields:
+ - name: value
+ value:
+ int:
+ value: "6"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: svc3
+
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index c680912d..f7526e2c 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -50,6 +50,7 @@ var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("filter by an unknown tag", helpers.Args{Input:
"tag_filter_unknown", Duration: 25 * time.Minute, Offset: -20 * time.Minute,
WantEmpty: true}),
g.Entry("group and max", helpers.Args{Input: "group_max", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
g.Entry("group and min", helpers.Args{Input: "group_min", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("group and sum", helpers.Args{Input: "group_sum", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
g.Entry("group without field", helpers.Args{Input: "group_no_field",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("top 2 by id", helpers.Args{Input: "top", Duration: 25 *
time.Minute, Offset: -20 * time.Minute}),
g.Entry("bottom 2 by id", helpers.Args{Input: "bottom", Duration: 25 *
time.Minute, Offset: -20 * time.Minute}),