This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new b3d1f2f7 Refactor TopN Aggregation Result Query Process (#937)
b3d1f2f7 is described below
commit b3d1f2f7c174d67ece9cc6c8583cfbff8e84f540
Author: peachisai <[email protected]>
AuthorDate: Mon Jan 19 09:05:17 2026 +0800
Refactor TopN Aggregation Result Query Process (#937)
* Refactor TopN Aggregation Result Query Process
---
api/proto/banyandb/measure/v1/topn.proto | 2 +
banyand/dquery/topn.go | 30 +-
banyand/measure/block.go | 111 ++++++-
banyand/measure/query.go | 24 +-
banyand/measure/topn.go | 25 +-
banyand/measure/topn_post_processor.go | 344 ++++++++++++++++++++
banyand/measure/topn_post_processor_test.go | 157 +++++++++
banyand/query/processor_topn.go | 356 +++------------------
pkg/query/logical/measure/topn_analyzer.go | 2 +
pkg/query/logical/measure/topn_plan_localscan.go | 24 +-
pkg/query/model/model.go | 2 +
.../measures/endpoint_resp_time_minute.json | 37 +++
.../endpoint_resp_time_minute_top_bottom_100.json | 17 +
test/cases/init.go | 2 +
.../testdata/endpoint_resp_time_minute_data.json | 102 ++++++
.../testdata/endpoint_resp_time_minute_data1.json | 102 ++++++
test/cases/topn/data/input/aggr_version_merged.ql | 22 ++
.../cases/topn/data/input/aggr_version_merged.yaml | 22 ++
test/cases/topn/data/want/aggr_version_merged.yaml | 43 +++
test/cases/topn/topn.go | 1 +
20 files changed, 1082 insertions(+), 343 deletions(-)
diff --git a/api/proto/banyandb/measure/v1/topn.proto
b/api/proto/banyandb/measure/v1/topn.proto
index 48afc024..712932de 100644
--- a/api/proto/banyandb/measure/v1/topn.proto
+++ b/api/proto/banyandb/measure/v1/topn.proto
@@ -35,6 +35,8 @@ message TopNList {
message Item {
repeated model.v1.Tag entity = 1;
model.v1.FieldValue value = 2;
+ int64 version = 3;
+ google.protobuf.Timestamp timestamp = 4;
}
// items contains top-n items in a list
repeated Item items = 2;
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index d96b78c2..d50290ba 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -29,7 +29,6 @@ import (
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
- "github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/iter/sort"
@@ -55,12 +54,8 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
}
n := time.Now()
now := bus.MessageID(request.TimeRange.Begin.Nanos)
- if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
- resp = bus.NewMessage(now, common.NewError("unspecified
requested sort direction"))
- return
- }
- if request.GetAgg() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
- resp = bus.NewMessage(now, common.NewError("unspecified
requested aggregation function"))
+ if err := t.validateRequest(request); err != nil {
+ resp = bus.NewMessage(now, common.NewError("%s", err.Error()))
return
}
if e := t.log.Debug(); e.Enabled() {
@@ -116,7 +111,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
return
}
var allErr error
- aggregator := query.CreateTopNPostAggregator(request.GetTopN(),
+ aggregator := measure.CreateTopNPostProcessor(request.GetTopN(),
agg, request.GetFieldValueSort())
var tags []string
var responseCount int
@@ -142,7 +137,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
for _, e := range tn.Entity {
entityValues =
append(entityValues, e.Value)
}
- _ = aggregator.Put(entityValues,
tn.Value.GetInt().GetValue(), uint64(l.Timestamp.AsTime().UnixMilli()))
+ aggregator.Put(entityValues,
tn.Value.GetInt().GetValue(), uint64(tn.Timestamp.AsTime().UnixMilli()),
tn.Version)
}
}
}
@@ -158,7 +153,12 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
resp = bus.NewMessage(now, &measurev1.TopNResponse{})
return
}
- lists := aggregator.Val(tags)
+ lists, err := aggregator.Val(tags)
+ if err != nil {
+ resp = bus.NewMessage(now, common.NewError("failed to
post-aggregate %s: %v", request.GetName(), err))
+ return
+ }
+
if span != nil {
span.Tagf("list_count", "%d", len(lists))
}
@@ -206,3 +206,13 @@ func (s *sortedTopNList) Next() bool {
func (s *sortedTopNList) Val() *comparableTopNItem {
return &comparableTopNItem{s.Items[s.index-1]}
}
+
+func (t *topNQueryProcessor) validateRequest(request *measurev1.TopNRequest)
error {
+ if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
+ return errors.New("unspecified requested sort direction")
+ }
+ if request.GetAgg() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+ return errors.New("unspecified requested aggregation function")
+ }
+ return nil
+}
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 59f50241..24c9cd48 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -666,9 +666,118 @@ func (bc *blockCursor) copyTo(r *model.MeasureResult,
storedIndexValue map[commo
}
}
+func (bc *blockCursor) mergeTopNResult(r *model.MeasureResult,
storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue,
+ topNPostAggregator PostProcessor,
+) {
+ r.SID = bc.bm.seriesID
+ var indexValue map[string]*modelv1.TagValue
+ if storedIndexValue != nil {
+ indexValue = storedIndexValue[r.SID]
+ }
+ for i := range r.TagFamilies {
+ tfName := r.TagFamilies[i].Name
+ var cf *columnFamily
+ for j := range r.TagFamilies[i].Tags {
+ tagName := r.TagFamilies[i].Tags[j].Name
+ if indexValue != nil && indexValue[tagName] != nil {
+
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] =
indexValue[tagName]
+ continue
+ }
+ if cf == nil {
+ for i := range bc.tagFamilies {
+ if bc.tagFamilies[i].name == tfName {
+ cf = &bc.tagFamilies[i]
+ break
+ }
+ }
+ }
+ for _, c := range cf.columns {
+ if c.name == tagName {
+ schemaType, hasSchemaType :=
bc.schemaTagTypes[tagName]
+ if hasSchemaType && c.valueType ==
schemaType {
+
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] =
mustDecodeTagValue(c.valueType, c.values[bc.idx])
+ } else {
+
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] =
pbv1.NullTagValue
+ }
+ break
+ }
+ }
+ }
+ }
+
+ topNValue := GenerateTopNValue()
+ defer ReleaseTopNValue(topNValue)
+ decoder := GenerateTopNValuesDecoder()
+ defer ReleaseTopNValuesDecoder(decoder)
+
+ uTimestamps := uint64(bc.timestamps[bc.idx])
+
+ for i, c := range bc.fields.columns {
+ srcFieldValue := r.Fields[i].Values[len(r.Fields[i].Values)-1]
+ destFieldValue := mustDecodeFieldValue(c.valueType,
c.values[bc.idx])
+
+ topNValue.Reset()
+
+ if err := topNValue.Unmarshal(srcFieldValue.GetBinaryData(),
decoder); err != nil {
+ log.Error().Err(err).Msg("failed to unmarshal topN
value, skip current batch")
+ continue
+ }
+
+ valueName := topNValue.valueName
+ entityTagNames := topNValue.entityTagNames
+
+ for j, entityList := range topNValue.entities {
+ entityValues := make(pbv1.EntityValues, 0,
len(entityList))
+ for _, e := range entityList {
+ entityValues = append(entityValues, e)
+ }
+ topNPostAggregator.Put(entityValues,
topNValue.values[j], uTimestamps, r.Versions[len(r.Versions)-1])
+ }
+
+ topNValue.Reset()
+ if err := topNValue.Unmarshal(destFieldValue.GetBinaryData(),
decoder); err != nil {
+ log.Error().Err(err).Msg("failed to unmarshal topN
value, skip current batch")
+ continue
+ }
+
+ for j, entityList := range topNValue.entities {
+ entityValues := make(pbv1.EntityValues, 0,
len(entityList))
+ for _, e := range entityList {
+ entityValues = append(entityValues, e)
+ }
+ topNPostAggregator.Put(entityValues,
topNValue.values[j], uTimestamps, bc.versions[bc.idx])
+ }
+
+ items, err := topNPostAggregator.Flush()
+ if err != nil {
+ log.Error().Err(err).Msg("failed to flush aggregator,
skip current batch")
+ continue
+ }
+
+ topNValue.Reset()
+ topNValue.setMetadata(valueName, entityTagNames)
+
+ for _, item := range items {
+ topNValue.addValue(item.val, item.values)
+ }
+
+ buf, err := topNValue.marshal(make([]byte, 0, 128))
+ if err != nil {
+ log.Error().Err(err).Msg("failed to marshal topN value,
skip current batch")
+ continue
+ }
+
+ r.Fields[i].Values[len(r.Fields[i].Values)-1] =
&modelv1.FieldValue{
+ Value: &modelv1.FieldValue_BinaryData{
+ BinaryData: buf,
+ },
+ }
+ r.Versions[len(r.Versions)-1] = bc.versions[bc.idx]
+ }
+}
+
func (bc *blockCursor) replace(r *model.MeasureResult, storedIndexValue
map[common.SeriesID]map[string]*modelv1.TagValue) {
r.SID = bc.bm.seriesID
- r.Timestamps[len(r.Timestamps)-1] = bc.timestamps[bc.idx]
r.Versions[len(r.Versions)-1] = bc.versions[bc.idx]
var indexValue map[string]*modelv1.TagValue
if storedIndexValue != nil {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 426c5963..22fa8aa4 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -71,6 +71,11 @@ type queryOptions struct {
maxTimestamp int64
}
+type topNQueryOptions struct {
+ sortDirection modelv1.Sort
+ number int32
+}
+
func (m *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions)
(mqr model.MeasureQueryResult, err error) {
if mqo.TimeRange == nil {
return nil, errors.New("invalid query options: timeRange are
required")
@@ -190,6 +195,13 @@ func (m *measure) Query(ctx context.Context, mqo
model.MeasureQueryOptions) (mqr
}
}
+ if mqo.Name == "_top_n_result" {
+ result.topNQueryOptions = &topNQueryOptions{
+ sortDirection: mqo.Sort,
+ number: mqo.Number,
+ }
+ }
+
return &result, nil
}
@@ -675,6 +687,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue
{
type queryResult struct {
ctx context.Context
+ topNQueryOptions *topNQueryOptions
sidToIndex map[common.SeriesID]int
storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue
tagProjection []model.TagProjection
@@ -842,6 +855,13 @@ func (qr *queryResult) merge(storedIndexValue
map[common.SeriesID]map[string]*mo
var lastVersion int64
var lastSid common.SeriesID
+ var topNPostAggregator PostProcessor
+
+ if qr.topNQueryOptions != nil {
+ topNPostAggregator =
CreateTopNPostProcessor(qr.topNQueryOptions.number,
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED,
+ qr.topNQueryOptions.sortDirection)
+ }
+
for qr.Len() > 0 {
topBC := qr.data[0]
if lastSid != 0 && topBC.bm.seriesID != lastSid {
@@ -851,7 +871,9 @@ func (qr *queryResult) merge(storedIndexValue
map[common.SeriesID]map[string]*mo
if len(result.Timestamps) > 0 &&
topBC.timestamps[topBC.idx] ==
result.Timestamps[len(result.Timestamps)-1] {
- if topBC.versions[topBC.idx] > lastVersion {
+ if topNPostAggregator != nil {
+ topBC.mergeTopNResult(result, storedIndexValue,
topNPostAggregator)
+ } else if topBC.versions[topBC.idx] > lastVersion {
topBC.replace(result, storedIndexValue)
}
} else {
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index d595b105..3c042017 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -386,13 +386,6 @@ func (t *topNStreamingProcessor) writeStreamRecord(record
flow.StreamRecord, buf
},
},
},
- {
- Value: &modelv1.TagValue_Str{
- Str: &modelv1.Str{
- Value: t.nodeID,
- },
- },
- },
}
buf = buf[:0]
if buf, err = topNValue.marshal(buf); err != nil {
@@ -414,6 +407,7 @@ func (t *topNStreamingProcessor) writeStreamRecord(record
flow.StreamRecord, buf
},
},
},
+ Version: time.Now().UnixNano(),
},
},
EntityValues: entityValues,
@@ -953,3 +947,20 @@ func (t *TopNValue) Unmarshal(src []byte, decoder
*encoding.BytesBlockDecoder) e
func GroupName(groupTags []string) string {
return strings.Join(groupTags, "|")
}
+
+// GenerateTopNValuesDecoder returns a new decoder instance of TopNValues.
+func GenerateTopNValuesDecoder() *encoding.BytesBlockDecoder {
+ v := topNValuesDecoderPool.Get()
+ if v == nil {
+ return &encoding.BytesBlockDecoder{}
+ }
+ return v
+}
+
+// ReleaseTopNValuesDecoder releases a decoder instance of TopNValues.
+func ReleaseTopNValuesDecoder(d *encoding.BytesBlockDecoder) {
+ d.Reset()
+ topNValuesDecoderPool.Put(d)
+}
+
+var topNValuesDecoderPool =
pool.Register[*encoding.BytesBlockDecoder]("topn-valueDecoder")
diff --git a/banyand/measure/topn_post_processor.go
b/banyand/measure/topn_post_processor.go
new file mode 100644
index 00000000..fde0f5a4
--- /dev/null
+++ b/banyand/measure/topn_post_processor.go
@@ -0,0 +1,344 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "container/heap"
+ "slices"
+ "time"
+
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/flow"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/aggregation"
+)
+
+// PostProcessor defines necessary methods for Top-N post processor with or
without aggregation.
+type PostProcessor interface {
+ Put(entityValues pbv1.EntityValues, val int64, timestampMillis uint64,
version int64)
+ Flush() ([]*topNAggregatorItem, error)
+ Val([]string) ([]*measurev1.TopNList, error)
+}
+
+// CreateTopNPostProcessor creates a Top-N post processor with or without
aggregation.
+func CreateTopNPostProcessor(topN int32, aggrFunc modelv1.AggregationFunction,
sort modelv1.Sort) PostProcessor {
+ if aggrFunc ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+ // if aggregation is not specified, we have to keep all
timelines
+ return &topNPostProcessor{
+ topN: topN,
+ sort: sort,
+ timelines: make(map[uint64]*topNTimelineItem),
+ }
+ }
+ aggregator := &topNPostProcessor{
+ topN: topN,
+ sort: sort,
+ aggrFunc: aggrFunc,
+ cache: make(map[string]*topNAggregatorItem),
+ timelines: make(map[uint64]*topNTimelineItem),
+ items: make([]*topNAggregatorItem, 0, topN),
+ }
+ heap.Init(aggregator)
+ return aggregator
+}
+
+func (taggr *topNPostProcessor) Len() int {
+ return len(taggr.items)
+}
+
+// Less reports whether min/max heap has to be built.
+// For DESC, a min heap has to be built,
+// while for ASC, a max heap has to be built.
+func (taggr *topNPostProcessor) Less(i, j int) bool {
+ if taggr.sort == modelv1.Sort_SORT_DESC {
+ return taggr.items[i].int64Func.Val() <
taggr.items[j].int64Func.Val()
+ }
+ return taggr.items[i].int64Func.Val() > taggr.items[j].int64Func.Val()
+}
+
+func (taggr *topNPostProcessor) Swap(i, j int) {
+ taggr.items[i], taggr.items[j] = taggr.items[j], taggr.items[i]
+ taggr.items[i].index = i
+ taggr.items[j].index = j
+}
+
+func (taggr *topNPostProcessor) Push(x any) {
+ n := len(taggr.items)
+ item := x.(*topNAggregatorItem)
+ item.index = n
+ taggr.items = append(taggr.items, item)
+}
+
+func (taggr *topNPostProcessor) Pop() any {
+ old := taggr.items
+ n := len(old)
+ item := old[n-1]
+ old[n-1] = nil
+ item.index = -1
+ taggr.items = old[0 : n-1]
+ return item
+}
+
+func (taggr *topNPostProcessor) tryEnqueue(key string, item
*topNAggregatorItem) {
+ if lowest := taggr.items[0]; lowest != nil {
+ shouldReplace := (taggr.sort == modelv1.Sort_SORT_DESC &&
lowest.int64Func.Val() < item.int64Func.Val()) ||
+ (taggr.sort != modelv1.Sort_SORT_DESC &&
lowest.int64Func.Val() > item.int64Func.Val())
+
+ if shouldReplace {
+ delete(taggr.cache, lowest.key)
+ taggr.cache[key] = item
+ taggr.items[0] = item
+ item.index = 0
+ heap.Fix(taggr, 0)
+ }
+ }
+}
+
+var _ flow.Element = (*topNAggregatorItem)(nil)
+
+type topNAggregatorItem struct {
+ int64Func aggregation.Func[int64]
+ key string
+ values pbv1.EntityValues
+ val int64
+ version int64
+ index int
+}
+
+func (n *topNAggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
+ tags := make([]*modelv1.Tag, len(n.values))
+ for i := 0; i < len(tags); i++ {
+ tags[i] = &modelv1.Tag{
+ Key: tagNames[i],
+ Value: n.values[i],
+ }
+ }
+ return tags
+}
+
+func (n *topNAggregatorItem) GetIndex() int {
+ return n.index
+}
+
+func (n *topNAggregatorItem) SetIndex(i int) {
+ n.index = i
+}
+
+type topNTimelineItem struct {
+ queue *flow.DedupPriorityQueue
+ items map[string]*topNAggregatorItem
+}
+
+type topNPostProcessor struct {
+ cache map[string]*topNAggregatorItem
+ timelines map[uint64]*topNTimelineItem
+ items []*topNAggregatorItem
+ sort modelv1.Sort
+ aggrFunc modelv1.AggregationFunction
+ topN int32
+}
+
+func (taggr *topNPostProcessor) Put(entityValues pbv1.EntityValues, val int64,
timestampMillis uint64, version int64) {
+ timeline, ok := taggr.timelines[timestampMillis]
+ key := entityValues.String()
+ if !ok {
+ timeline = &topNTimelineItem{
+ queue: flow.NewPriorityQueue(func(a, b interface{}) int
{
+ if taggr.sort == modelv1.Sort_SORT_DESC {
+ if a.(*topNAggregatorItem).val <
b.(*topNAggregatorItem).val {
+ return -1
+ } else if a.(*topNAggregatorItem).val
== b.(*topNAggregatorItem).val {
+ return 0
+ }
+ return 1
+ }
+ if a.(*topNAggregatorItem).val <
b.(*topNAggregatorItem).val {
+ return 1
+ } else if a.(*topNAggregatorItem).val ==
b.(*topNAggregatorItem).val {
+ return 0
+ }
+ return -1
+ }, false),
+ items: make(map[string]*topNAggregatorItem),
+ }
+
+ newItem := &topNAggregatorItem{
+ val: val,
+ key: key,
+ values: entityValues,
+ version: version,
+ }
+
+ timeline.items[key] = newItem
+ heap.Push(timeline.queue, newItem)
+ taggr.timelines[timestampMillis] = timeline
+ return
+ }
+
+ if item, exist := timeline.items[key]; exist {
+ if version >= item.version {
+ item.val = val
+ item.version = version
+ heap.Fix(timeline.queue, item.index)
+ }
+
+ return
+ }
+
+ newItem := &topNAggregatorItem{
+ val: val,
+ key: key,
+ values: entityValues,
+ version: version,
+ }
+
+ if timeline.queue.Len() < int(taggr.topN) {
+ heap.Push(timeline.queue, newItem)
+ timeline.items[key] = newItem
+ return
+ }
+
+ if lowest := timeline.queue.Peek(); lowest != nil {
+ lowestItem := lowest.(*topNAggregatorItem)
+
+ shouldReplace := (taggr.sort == modelv1.Sort_SORT_DESC &&
lowestItem.val < val) ||
+ (taggr.sort != modelv1.Sort_SORT_DESC && lowestItem.val
> val)
+
+ if shouldReplace {
+ delete(timeline.items, lowestItem.key)
+ timeline.items[key] = newItem
+ timeline.queue.ReplaceLowest(newItem)
+ }
+ }
+}
+
+func (taggr *topNPostProcessor) Flush() ([]*topNAggregatorItem, error) {
+ var result []*topNAggregatorItem
+
+ if taggr.aggrFunc ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+ for _, timeline := range taggr.timelines {
+ for timeline.queue.Len() > 0 {
+ item :=
heap.Pop(timeline.queue).(*topNAggregatorItem)
+ result = append(result, item)
+ }
+ }
+ clear(taggr.timelines)
+ } else {
+ for _, timeline := range taggr.timelines {
+ for _, item := range timeline.items {
+ if exist, found := taggr.cache[item.key]; found
{
+ exist.int64Func.In(item.val)
+ heap.Fix(taggr, exist.index)
+ continue
+ }
+
+ aggrFunc, err :=
aggregation.NewFunc[int64](taggr.aggrFunc)
+ if err != nil {
+ return nil, err
+ }
+
+ item.int64Func = aggrFunc
+ item.int64Func.In(item.val)
+
+ if taggr.Len() < int(taggr.topN) {
+ taggr.cache[item.key] = item
+ heap.Push(taggr, item)
+ } else {
+ taggr.tryEnqueue(item.key, item)
+ }
+ }
+ }
+ result = make([]*topNAggregatorItem, 0, taggr.Len())
+ for taggr.Len() > 0 {
+ item := heap.Pop(taggr).(*topNAggregatorItem)
+ result = append(result, item)
+ }
+ }
+
+ return result, nil
+}
+
+func (taggr *topNPostProcessor) Val(tagNames []string) ([]*measurev1.TopNList,
error) {
+ if taggr.aggrFunc !=
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+ return taggr.valWithAggregation(tagNames)
+ }
+
+ return taggr.valWithoutAggregation(tagNames), nil
+}
+
+func (taggr *topNPostProcessor) valWithAggregation(tagNames []string)
([]*measurev1.TopNList, error) {
+ topNAggregatorItems, err := taggr.Flush()
+ if err != nil {
+ return nil, err
+ }
+ length := len(topNAggregatorItems)
+ items := make([]*measurev1.TopNList_Item, length)
+
+ for i, item := range topNAggregatorItems {
+ targetIdx := length - 1 - i
+
+ items[targetIdx] = &measurev1.TopNList_Item{
+ Entity: item.GetTags(tagNames),
+ Value: &modelv1.FieldValue{
+ Value: &modelv1.FieldValue_Int{
+ Int: &modelv1.Int{Value:
item.int64Func.Val()},
+ },
+ },
+ }
+ }
+ return []*measurev1.TopNList{
+ {
+ Timestamp: timestamppb.Now(),
+ Items: items,
+ },
+ }, nil
+}
+
+func (taggr *topNPostProcessor) valWithoutAggregation(tagNames []string)
[]*measurev1.TopNList {
+ topNLists := make([]*measurev1.TopNList, 0, len(taggr.timelines))
+ for ts, timeline := range taggr.timelines {
+ items := make([]*measurev1.TopNList_Item, timeline.queue.Len())
+ for idx, elem := range timeline.queue.Values() {
+ items[idx] = &measurev1.TopNList_Item{
+ Entity:
elem.(*topNAggregatorItem).GetTags(tagNames),
+ Value: &modelv1.FieldValue{
+ Value: &modelv1.FieldValue_Int{
+ Int: &modelv1.Int{Value:
elem.(*topNAggregatorItem).val},
+ },
+ },
+ }
+ }
+ topNLists = append(topNLists, &measurev1.TopNList{
+ Timestamp: timestamppb.New(time.Unix(0, int64(ts))),
+ Items: items,
+ })
+ }
+
+ slices.SortStableFunc(topNLists, func(a, b *measurev1.TopNList) int {
+ r := int(a.GetTimestamp().GetSeconds() -
b.GetTimestamp().GetSeconds())
+ if r != 0 {
+ return r
+ }
+ return int(a.GetTimestamp().GetNanos() -
b.GetTimestamp().GetNanos())
+ })
+
+ return topNLists
+}
diff --git a/banyand/measure/topn_post_processor_test.go
b/banyand/measure/topn_post_processor_test.go
new file mode 100644
index 00000000..95665cd0
--- /dev/null
+++ b/banyand/measure/topn_post_processor_test.go
@@ -0,0 +1,157 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "testing"
+
+ "github.com/google/go-cmp/cmp"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/protobuf/testing/protocmp"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/model"
+)
+
+func TestBlockCursor_MergeTopNResult(t *testing.T) {
+ tests := []struct {
+ srcTopNVal *TopNValue
+ destTopNVal *TopNValue
+ wantTopNVal *TopNValue
+ name string
+ sort modelv1.Sort
+ topN int32
+ }{
+ {
+ name: "Test block merge TopN result",
+ srcTopNVal: &TopNValue{
+ valueName: "value",
+ entityTagNames: []string{"entity_id"},
+ values: []int64{1000, 200, 300, 400,
500},
+ entities: [][]*modelv1.TagValue{
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_1"}}}},
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_2"}}}},
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_3"}}}},
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_4"}}}},
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_5"}}}},
+ },
+ },
+ destTopNVal: &TopNValue{
+ valueName: "value",
+ entityTagNames: []string{"entity_id"},
+ values: []int64{550, 200, 500, 600,
400},
+ entities: [][]*modelv1.TagValue{
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_3"}}}},
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_4"}}}},
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_5"}}}},
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_6"}}}},
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_7"}}}},
+ },
+ },
+ sort: modelv1.Sort_SORT_DESC,
+ topN: 3,
+ wantTopNVal: &TopNValue{
+ valueName: "value",
+ entityTagNames: []string{"entity_id"},
+ values: []int64{550, 600, 1000},
+ entities: [][]*modelv1.TagValue{
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_3"}}}},
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_6"}}}},
+ {{Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "entity_1"}}}},
+ },
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ srcBuf, err := tt.srcTopNVal.marshal(make([]byte, 0,
256))
+ require.NoError(t, err)
+
+ destBuf, err := tt.destTopNVal.marshal(make([]byte, 0,
256))
+ require.NoError(t, err)
+
+ bc := &blockCursor{
+ bm: blockMetadata{seriesID: 1},
+ tagFamilies: nil,
+ fields: columnFamily{
+ columns: []column{
+ {name: "value", valueType:
pbv1.ValueTypeBinaryData, values: [][]byte{destBuf}},
+ },
+ },
+ timestamps: []int64{1},
+ versions: []int64{2},
+ idx: 0,
+ schemaTagTypes: map[string]pbv1.ValueType{},
+ }
+
+ result := &model.MeasureResult{
+ Timestamps: []int64{1},
+ Versions: []int64{1},
+ TagFamilies: []model.TagFamily{
+ {Name: "_topN", Tags: []model.Tag{
+ {Name: "name", Values:
[]*modelv1.TagValue{pbv1.NullTagValue}},
+ }},
+ },
+ Fields: []model.Field{
+ {
+ Name: "value",
+ Values:
[]*modelv1.FieldValue{{Value: &modelv1.FieldValue_BinaryData{BinaryData:
srcBuf}}},
+ },
+ },
+ }
+
+ topNPostAggregator := CreateTopNPostProcessor(tt.topN,
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED, tt.sort)
+
+ expectedTagValue := "endpoint_resp_time-service"
+ storedIndexValue :=
map[common.SeriesID]map[string]*modelv1.TagValue{
+ 1: {
+ "name": {
+ Value: &modelv1.TagValue_Str{
+ Str:
&modelv1.Str{Value: expectedTagValue},
+ },
+ },
+ },
+ }
+
+ bc.mergeTopNResult(result, storedIndexValue,
topNPostAggregator)
+
+ tagValue := result.TagFamilies[0].Tags[0].Values[0]
+ require.Equal(t, expectedTagValue,
tagValue.GetStr().GetValue())
+
+ mergedFieldValue := result.Fields[0].Values[0]
+ require.NotNil(t, mergedFieldValue.GetBinaryData())
+
+ gotTopNVal := GenerateTopNValue()
+ defer ReleaseTopNValue(gotTopNVal)
+ decoder := GenerateTopNValuesDecoder()
+ defer ReleaseTopNValuesDecoder(decoder)
+
+ err =
gotTopNVal.Unmarshal(mergedFieldValue.GetBinaryData(), decoder)
+ require.NoError(t, err)
+
+ require.Equal(t, tt.wantTopNVal.valueName,
gotTopNVal.valueName)
+ require.Equal(t, tt.wantTopNVal.entityTagNames,
gotTopNVal.entityTagNames)
+ require.Equal(t, tt.wantTopNVal.values,
gotTopNVal.values)
+ diff := cmp.Diff(tt.wantTopNVal.entities,
gotTopNVal.entities, protocmp.Transform())
+ require.Empty(t, diff, "entities differ: %s", diff)
+ })
+ }
+}
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index f3246c1f..90a49bbc 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -18,16 +18,12 @@
package query
import (
- "container/heap"
"context"
"errors"
"fmt"
- "slices"
"strings"
"time"
- "google.golang.org/protobuf/types/known/timestamppb"
-
"github.com/apache/skywalking-banyandb/api/common"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -35,11 +31,8 @@ import (
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/pkg/bus"
- "github.com/apache/skywalking-banyandb/pkg/flow"
"github.com/apache/skywalking-banyandb/pkg/logger"
- pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
- "github.com/apache/skywalking-banyandb/pkg/query/aggregation"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
logical_measure
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
)
@@ -50,29 +43,18 @@ type topNQueryProcessor struct {
*bus.UnImplementedHealthyListener
}
-func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message)
(resp bus.Message) {
- request, ok := message.Data().(*measurev1.TopNRequest)
- n := time.Now()
- now := n.UnixNano()
- if !ok {
- t.log.Warn().Msg("invalid event data type")
- return
- }
- ml := t.log.Named("topn", strings.Join(request.Groups, ","),
request.Name)
- if e := ml.Debug(); e.Enabled() {
- e.RawJSON("req", logger.Proto(request)).Msg("received a topn
event for groups: " + strings.Join(request.Groups, ","))
- }
- if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
- t.log.Warn().Msg("invalid requested sort direction")
- return
- }
- if e := t.log.Debug(); e.Enabled() {
- e.Stringer("req", request).Msg("received a topN query event")
+type topNQueryContext struct {
+ sourceMeasureSchemas []*databasev1.Measure
+ topNSchemas []*databasev1.TopNAggregation
+ ecc []executor.MeasureExecutionContext
+}
+
+func (t *topNQueryProcessor) prepareGroupsContext(ctx context.Context, request
*measurev1.TopNRequest, ml *logger.Logger) (*topNQueryContext, error) {
+ qc := &topNQueryContext{
+ sourceMeasureSchemas: make([]*databasev1.Measure, 0,
len(request.Groups)),
+ topNSchemas: make([]*databasev1.TopNAggregation, 0,
len(request.Groups)),
+ ecc: make([]executor.MeasureExecutionContext,
0, len(request.Groups)),
}
- // Process all groups
- var sourceMeasureSchemas []*databasev1.Measure
- var topNSchemas []*databasev1.TopNAggregation
- var ecc []executor.MeasureExecutionContext
for _, group := range request.Groups {
topNMetadata := &commonv1.Metadata{
@@ -84,32 +66,60 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
t.log.Error().Err(err).
Str("group", group).
Msg("fail to get execution context")
- return
+ return nil, err
}
if topNSchema.GetFieldValueSort() !=
modelv1.Sort_SORT_UNSPECIFIED &&
topNSchema.GetFieldValueSort() !=
request.GetFieldValueSort() {
t.log.Warn().Str("group", group).Msg("unmatched sort
direction")
- return
+ return nil, err
}
sourceMeasure, err :=
t.measureService.Measure(topNSchema.GetSourceMeasure())
if err != nil {
t.log.Error().Err(err).
Str("group", group).
Msg("fail to find source measure")
- return
+ return nil, err
}
topNResultMeasure, err :=
t.measureService.Measure(measure.GetTopNSchemaMetadata(group))
if err != nil {
ml.Error().Err(err).Str("group", group).Msg("fail to
find topn result measure")
- return
+ return nil, err
}
- sourceMeasureSchemas = append(sourceMeasureSchemas,
sourceMeasure.GetSchema())
- topNSchemas = append(topNSchemas, topNSchema)
- ecc = append(ecc, topNResultMeasure)
+ qc.sourceMeasureSchemas = append(qc.sourceMeasureSchemas,
sourceMeasure.GetSchema())
+ qc.topNSchemas = append(qc.topNSchemas, topNSchema)
+ qc.ecc = append(qc.ecc, topNResultMeasure)
+ }
+
+ return qc, nil
+}
+
+func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message)
(resp bus.Message) {
+ request, ok := message.Data().(*measurev1.TopNRequest)
+ n := time.Now()
+ now := n.UnixNano()
+ if !ok {
+ t.log.Warn().Msg("invalid event data type")
+ return
+ }
+ ml := t.log.Named("topn", strings.Join(request.Groups, ","),
request.Name)
+ if e := ml.Debug(); e.Enabled() {
+ e.RawJSON("req", logger.Proto(request)).Msg("received a topn
event for groups: " + strings.Join(request.Groups, ","))
+ }
+ if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
+ t.log.Warn().Msg("invalid requested sort direction")
+ return
+ }
+ if e := t.log.Debug(); e.Enabled() {
+ e.Stringer("req", request).Msg("received a topN query event")
+ }
+ // Process all groups
+ qc, err := t.prepareGroupsContext(ctx, request, ml)
+ if err != nil {
+ return
}
- plan, err := logical_measure.TopNAnalyze(request, sourceMeasureSchemas,
topNSchemas, ecc)
+ plan, err := logical_measure.TopNAnalyze(request,
qc.sourceMeasureSchemas, qc.topNSchemas, qc.ecc)
if err != nil {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to analyze the query request for topn %s: %v", request.Name, err))
return
@@ -186,8 +196,10 @@ func toTopNResponse(dps []*measurev1.DataPoint)
*measurev1.TopNResponse {
topNItems := make([]*measurev1.TopNList_Item, len(dps))
for i, dp := range dps {
topNItems[i] = &measurev1.TopNList_Item{
- Entity: dp.GetTagFamilies()[0].GetTags(),
- Value: dp.GetFields()[0].GetValue(),
+ Entity: dp.GetTagFamilies()[0].GetTags(),
+ Value: dp.GetFields()[0].GetValue(),
+ Version: dp.GetVersion(),
+ Timestamp: dp.GetTimestamp(),
}
}
topNList = append(topNList, &measurev1.TopNList{
@@ -195,269 +207,3 @@ func toTopNResponse(dps []*measurev1.DataPoint)
*measurev1.TopNResponse {
})
return &measurev1.TopNResponse{Lists: topNList}
}
-
-var _ heap.Interface = (*postAggregationProcessor)(nil)
-
-type aggregatorItem struct {
- int64Func aggregation.Func[int64]
- key string
- values pbv1.EntityValues
- index int
-}
-
-func (n *aggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
- tags := make([]*modelv1.Tag, len(n.values))
- for i := 0; i < len(tags); i++ {
- tags[i] = &modelv1.Tag{
- Key: tagNames[i],
- Value: n.values[i],
- }
- }
- return tags
-}
-
-// PostProcessor defines necessary methods for Top-N post processor with or
without aggregation.
-type PostProcessor interface {
- Put(entityValues pbv1.EntityValues, val int64, timestampMillis uint64)
error
- Val([]string) []*measurev1.TopNList
-}
-
-// CreateTopNPostAggregator creates a Top-N post processor with or without
aggregation.
-func CreateTopNPostAggregator(topN int32, aggrFunc
modelv1.AggregationFunction, sort modelv1.Sort) PostProcessor {
- if aggrFunc ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
- // if aggregation is not specified, we have to keep all
timelines
- return &postNonAggregationProcessor{
- topN: topN,
- sort: sort,
- timelines: make(map[uint64]*flow.DedupPriorityQueue),
- }
- }
- aggregator := &postAggregationProcessor{
- topN: topN,
- sort: sort,
- aggrFunc: aggrFunc,
- cache: make(map[string]*aggregatorItem),
- items: make([]*aggregatorItem, 0, topN),
- }
- heap.Init(aggregator)
- return aggregator
-}
-
-// postAggregationProcessor is an implementation of postProcessor with
aggregation.
-type postAggregationProcessor struct {
- cache map[string]*aggregatorItem
- items []*aggregatorItem
- latestTimestamp uint64
- topN int32
- sort modelv1.Sort
- aggrFunc modelv1.AggregationFunction
-}
-
-func (aggr postAggregationProcessor) Len() int {
- return len(aggr.items)
-}
-
-// Less reports whether min/max heap has to be built.
-// For DESC, a min heap has to be built,
-// while for ASC, a max heap has to be built.
-func (aggr postAggregationProcessor) Less(i, j int) bool {
- if aggr.sort == modelv1.Sort_SORT_DESC {
- return aggr.items[i].int64Func.Val() <
aggr.items[j].int64Func.Val()
- }
- return aggr.items[i].int64Func.Val() > aggr.items[j].int64Func.Val()
-}
-
-func (aggr *postAggregationProcessor) Swap(i, j int) {
- aggr.items[i], aggr.items[j] = aggr.items[j], aggr.items[i]
- aggr.items[i].index = i
- aggr.items[j].index = j
-}
-
-func (aggr *postAggregationProcessor) Push(x any) {
- n := len(aggr.items)
- item := x.(*aggregatorItem)
- item.index = n
- aggr.items = append(aggr.items, item)
-}
-
-func (aggr *postAggregationProcessor) Pop() any {
- old := aggr.items
- n := len(old)
- item := old[n-1]
- old[n-1] = nil
- item.index = -1
- aggr.items = old[0 : n-1]
- return item
-}
-
-func (aggr *postAggregationProcessor) Put(entityValues pbv1.EntityValues, val
int64, timestampMillis uint64) error {
- // update latest ts
- if aggr.latestTimestamp < timestampMillis {
- aggr.latestTimestamp = timestampMillis
- }
- key := entityValues.String()
- if item, found := aggr.cache[key]; found {
- item.int64Func.In(val)
- return nil
- }
-
- aggrFunc, err := aggregation.NewFunc[int64](aggr.aggrFunc)
- if err != nil {
- return err
- }
- item := &aggregatorItem{
- key: key,
- int64Func: aggrFunc,
- values: entityValues,
- }
- item.int64Func.In(val)
-
- if aggr.Len() < int(aggr.topN) {
- aggr.cache[key] = item
- heap.Push(aggr, item)
- } else {
- aggr.tryEnqueue(key, item)
- }
-
- return nil
-}
-
-func (aggr *postAggregationProcessor) tryEnqueue(key string, item
*aggregatorItem) {
- if lowest := aggr.items[0]; lowest != nil {
- if aggr.sort == modelv1.Sort_SORT_DESC &&
lowest.int64Func.Val() < item.int64Func.Val() {
- aggr.cache[key] = item
- aggr.items[0] = item
- heap.Fix(aggr, 0)
- } else if aggr.sort != modelv1.Sort_SORT_DESC &&
lowest.int64Func.Val() > item.int64Func.Val() {
- aggr.cache[key] = item
- aggr.items[0] = item
- heap.Fix(aggr, 0)
- }
- }
-}
-
-func (aggr *postAggregationProcessor) Val(tagNames []string)
[]*measurev1.TopNList {
- topNItems := make([]*measurev1.TopNList_Item, aggr.Len())
-
- for aggr.Len() > 0 {
- item := heap.Pop(aggr).(*aggregatorItem)
- topNItems[aggr.Len()] = &measurev1.TopNList_Item{
- Entity: item.GetTags(tagNames),
- Value: &modelv1.FieldValue{
- Value: &modelv1.FieldValue_Int{
- Int: &modelv1.Int{Value:
item.int64Func.Val()},
- },
- },
- }
- }
- return []*measurev1.TopNList{
- {
- Timestamp: timestamppb.New(time.Unix(0,
int64(aggr.latestTimestamp))),
- Items: topNItems,
- },
- }
-}
-
-var _ flow.Element = (*nonAggregatorItem)(nil)
-
-type nonAggregatorItem struct {
- key string
- values pbv1.EntityValues
- val int64
- index int
-}
-
-func (n *nonAggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
- tags := make([]*modelv1.Tag, len(n.values))
- for i := 0; i < len(tags); i++ {
- tags[i] = &modelv1.Tag{
- Key: tagNames[i],
- Value: n.values[i],
- }
- }
- return tags
-}
-
-func (n *nonAggregatorItem) GetIndex() int {
- return n.index
-}
-
-func (n *nonAggregatorItem) SetIndex(i int) {
- n.index = i
-}
-
-type postNonAggregationProcessor struct {
- timelines map[uint64]*flow.DedupPriorityQueue
- topN int32
- sort modelv1.Sort
-}
-
-func (naggr *postNonAggregationProcessor) Val(tagNames []string)
[]*measurev1.TopNList {
- topNLists := make([]*measurev1.TopNList, 0, len(naggr.timelines))
- for ts, timeline := range naggr.timelines {
- items := make([]*measurev1.TopNList_Item, timeline.Len())
- for idx, elem := range timeline.Values() {
- items[idx] = &measurev1.TopNList_Item{
- Entity:
elem.(*nonAggregatorItem).GetTags(tagNames),
- Value: &modelv1.FieldValue{
- Value: &modelv1.FieldValue_Int{
- Int: &modelv1.Int{Value:
elem.(*nonAggregatorItem).val},
- },
- },
- }
- }
- topNLists = append(topNLists, &measurev1.TopNList{
- Timestamp: timestamppb.New(time.Unix(0, int64(ts))),
- Items: items,
- })
- }
-
- slices.SortStableFunc(topNLists, func(a, b *measurev1.TopNList) int {
- r := int(a.GetTimestamp().GetSeconds() -
b.GetTimestamp().GetSeconds())
- if r != 0 {
- return r
- }
- return int(a.GetTimestamp().GetNanos() -
b.GetTimestamp().GetNanos())
- })
-
- return topNLists
-}
-
-func (naggr *postNonAggregationProcessor) Put(entityValues pbv1.EntityValues,
val int64, timestampMillis uint64) error {
- key := entityValues.String()
- if timeline, ok := naggr.timelines[timestampMillis]; ok {
- if timeline.Len() < int(naggr.topN) {
- heap.Push(timeline, &nonAggregatorItem{val: val, key:
key, values: entityValues})
- } else {
- if lowest := timeline.Peek(); lowest != nil {
- if naggr.sort == modelv1.Sort_SORT_DESC &&
lowest.(*nonAggregatorItem).val < val {
-
timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values:
entityValues})
- } else if naggr.sort != modelv1.Sort_SORT_DESC
&& lowest.(*nonAggregatorItem).val > val {
-
timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values:
entityValues})
- }
- }
- }
- return nil
- }
-
- timeline := flow.NewPriorityQueue(func(a, b interface{}) int {
- if naggr.sort == modelv1.Sort_SORT_DESC {
- if a.(*nonAggregatorItem).val <
b.(*nonAggregatorItem).val {
- return -1
- } else if a.(*nonAggregatorItem).val ==
b.(*nonAggregatorItem).val {
- return 0
- }
- return 1
- }
- if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
- return 1
- } else if a.(*nonAggregatorItem).val ==
b.(*nonAggregatorItem).val {
- return 0
- }
- return -1
- }, false)
- naggr.timelines[timestampMillis] = timeline
- heap.Push(timeline, &nonAggregatorItem{val: val, key: key, values:
entityValues})
-
- return nil
-}
diff --git a/pkg/query/logical/measure/topn_analyzer.go
b/pkg/query/logical/measure/topn_analyzer.go
index 4ac59297..34aced93 100644
--- a/pkg/query/logical/measure/topn_analyzer.go
+++ b/pkg/query/logical/measure/topn_analyzer.go
@@ -59,6 +59,7 @@ func TopNAnalyze(criteria *measurev1.TopNRequest,
sourceMeasureSchemaList []*dat
sort: criteria.FieldValueSort,
groupByTags: topNAggSchema.GetGroupByTagNames(),
ec: ec,
+ number: criteria.GetTopN(),
}
} else {
subPlans := make([]*unresolvedLocalScan, 0,
len(sourceMeasureSchemaList))
@@ -71,6 +72,7 @@ func TopNAnalyze(criteria *measurev1.TopNRequest,
sourceMeasureSchemaList []*dat
sort: criteria.FieldValueSort,
groupByTags:
topNAggSchemaList[i].GetGroupByTagNames(),
ec: ecc[i],
+ number: criteria.GetTopN(),
})
}
plan = &unresolvedTopNMerger{
diff --git a/pkg/query/logical/measure/topn_plan_localscan.go
b/pkg/query/logical/measure/topn_plan_localscan.go
index 23907764..be85e760 100644
--- a/pkg/query/logical/measure/topn_plan_localscan.go
+++ b/pkg/query/logical/measure/topn_plan_localscan.go
@@ -31,9 +31,7 @@ import (
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
- "github.com/apache/skywalking-banyandb/pkg/encoding"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
- "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
"github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -53,6 +51,7 @@ type unresolvedLocalScan struct {
conditions []*modelv1.Condition
groupByTags []string
sort modelv1.Sort
+ number int32
}
func (uls *unresolvedLocalScan) Analyze(s logical.Schema) (logical.Plan,
error) {
@@ -103,6 +102,8 @@ func (uls *unresolvedLocalScan) Analyze(s logical.Schema)
(logical.Plan, error)
},
},
FieldProjection: fieldProjection,
+ Sort: uls.sort,
+ Number: uls.number,
},
ec: uls.ec,
}, nil
@@ -195,8 +196,8 @@ func (ei *topNMIterator) Next() bool {
ei.current = ei.current[:0]
topNValue := measure.GenerateTopNValue()
defer measure.ReleaseTopNValue(topNValue)
- decoder := generateTopNValuesDecoder()
- defer releaseTopNValuesDecoder(decoder)
+ decoder := measure.GenerateTopNValuesDecoder()
+ defer measure.ReleaseTopNValuesDecoder(decoder)
for i := range r.Timestamps {
fv := r.Fields[0].Values[i]
@@ -256,18 +257,3 @@ func (ei *topNMIterator) Close() error {
}
return ei.err
}
-
-func generateTopNValuesDecoder() *encoding.BytesBlockDecoder {
- v := topNValuesDecoderPool.Get()
- if v == nil {
- return &encoding.BytesBlockDecoder{}
- }
- return v
-}
-
-func releaseTopNValuesDecoder(d *encoding.BytesBlockDecoder) {
- d.Reset()
- topNValuesDecoderPool.Put(d)
-}
-
-var topNValuesDecoderPool =
pool.Register[*encoding.BytesBlockDecoder]("topn-valueDecoder")
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index 3d00a0b6..4aa313ea 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -64,6 +64,8 @@ type MeasureQueryOptions struct {
Entities [][]*modelv1.TagValue
TagProjection []TagProjection
FieldProjection []string
+ Sort modelv1.Sort
+ Number int32
}
// MeasureResult is the result of a query.
diff --git a/pkg/test/measure/testdata/measures/endpoint_resp_time_minute.json
b/pkg/test/measure/testdata/measures/endpoint_resp_time_minute.json
new file mode 100644
index 00000000..1de19aae
--- /dev/null
+++ b/pkg/test/measure/testdata/measures/endpoint_resp_time_minute.json
@@ -0,0 +1,37 @@
+{
+ "metadata": {
+ "name": "endpoint_resp_time_minute",
+ "group": "sw_metric"
+ },
+ "tag_families": [
+ {
+ "name": "default",
+ "tags": [
+ {
+ "name": "entity_id",
+ "type": "TAG_TYPE_STRING"
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "name": "value",
+ "field_type": "FIELD_TYPE_INT",
+ "encoding_method": "ENCODING_METHOD_GORILLA",
+ "compression_method": "COMPRESSION_METHOD_ZSTD"
+ }
+ ],
+ "entity": {
+ "tag_names": [
+ "entity_id"
+ ]
+ },
+ "sharding_key": {
+ "tag_names": [
+ "entity_id"
+ ]
+ },
+ "interval": "1m",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git
a/pkg/test/measure/testdata/topn_aggregations/endpoint_resp_time_minute_top_bottom_100.json
b/pkg/test/measure/testdata/topn_aggregations/endpoint_resp_time_minute_top_bottom_100.json
new file mode 100644
index 00000000..af2ec806
--- /dev/null
+++
b/pkg/test/measure/testdata/topn_aggregations/endpoint_resp_time_minute_top_bottom_100.json
@@ -0,0 +1,17 @@
+{
+ "metadata": {
+ "name": "endpoint_resp_time_minute_top_bottom_100",
+ "group": "sw_metric"
+ },
+ "source_measure": {
+ "name": "endpoint_resp_time_minute",
+ "group": "sw_metric"
+ },
+ "field_name": "value",
+ "field_value_sort": 0,
+ "group_by_tag_names": [
+ "entity_id"
+ ],
+ "counters_number": 1000,
+ "lru_size": 10
+}
\ No newline at end of file
diff --git a/test/cases/init.go b/test/cases/init.go
index 3ec50509..71f3f520 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -98,6 +98,8 @@ func Initialize(addr string, now time.Time) {
casesmeasuredata.Write(conn, "endpoint_traffic", "sw_metric",
"endpoint_traffic.json", now, interval)
casesmeasuredata.Write(conn, "duplicated", "exception",
"duplicated.json", now, 0)
casesmeasuredata.Write(conn, "service_cpm_minute", "sw_updated",
"service_cpm_minute_updated_data.json", now.Add(10*time.Minute), interval)
+ casesmeasuredata.Write(conn, "endpoint_resp_time_minute", "sw_metric",
"endpoint_resp_time_minute_data.json", now, interval)
+ casesmeasuredata.Write(conn, "endpoint_resp_time_minute", "sw_metric",
"endpoint_resp_time_minute_data1.json", now.Add(10*time.Second), interval)
casesmeasuredata.WriteMixed(conn, now.Add(30*time.Minute), interval,
casesmeasuredata.WriteSpec{
Metadata: &commonv1.Metadata{Name:
"service_cpm_minute", Group: "sw_spec"},
diff --git
a/test/cases/measure/data/testdata/endpoint_resp_time_minute_data.json
b/test/cases/measure/data/testdata/endpoint_resp_time_minute_data.json
new file mode 100644
index 00000000..59333363
--- /dev/null
+++ b/test/cases/measure/data/testdata/endpoint_resp_time_minute_data.json
@@ -0,0 +1,102 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 200
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_3"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 300
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_4"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 400
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_5"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 500
+ }
+ }
+ ]
+ }
+]
diff --git
a/test/cases/measure/data/testdata/endpoint_resp_time_minute_data1.json
b/test/cases/measure/data/testdata/endpoint_resp_time_minute_data1.json
new file mode 100644
index 00000000..efe572fe
--- /dev/null
+++ b/test/cases/measure/data/testdata/endpoint_resp_time_minute_data1.json
@@ -0,0 +1,102 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 1000
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 50
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_3"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 230
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_4"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 80
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "entity_5"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 600
+ }
+ }
+ ]
+ }
+]
diff --git a/test/cases/topn/data/input/aggr_version_merged.ql
b/test/cases/topn/data/input/aggr_version_merged.ql
new file mode 100644
index 00000000..596cb846
--- /dev/null
+++ b/test/cases/topn/data/input/aggr_version_merged.ql
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+SHOW TOP 3
+FROM MEASURE endpoint_resp_time_minute_top_bottom_100 IN sw_metric
+TIME > '-15m'
+AGGREGATE BY MAX
+ORDER BY DESC
diff --git a/test/cases/topn/data/input/aggr_version_merged.yaml
b/test/cases/topn/data/input/aggr_version_merged.yaml
new file mode 100644
index 00000000..fb76e724
--- /dev/null
+++ b/test/cases/topn/data/input/aggr_version_merged.yaml
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "endpoint_resp_time_minute_top_bottom_100"
+groups: ["sw_metric"]
+topN: 3
+fieldValueSort: 1
+agg: 2
diff --git a/test/cases/topn/data/want/aggr_version_merged.yaml
b/test/cases/topn/data/want/aggr_version_merged.yaml
new file mode 100644
index 00000000..58d3060c
--- /dev/null
+++ b/test/cases/topn/data/want/aggr_version_merged.yaml
@@ -0,0 +1,43 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+lists:
+ - items:
+ - entity:
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
+ value:
+ int:
+ value: "1000"
+ - entity:
+ - key: entity_id
+ value:
+ str:
+ value: entity_5
+ value:
+ int:
+ value: "600"
+ - entity:
+ - key: entity_id
+ value:
+ str:
+ value: entity_3
+ value:
+ int:
+ value: "230"
diff --git a/test/cases/topn/topn.go b/test/cases/topn/topn.go
index bcb1d39b..51b983eb 100644
--- a/test/cases/topn/topn.go
+++ b/test/cases/topn/topn.go
@@ -48,4 +48,5 @@ var _ = g.DescribeTable("TopN Tests", verify,
g.Entry("using not equal in aggregation", helpers.Args{Input: "ne",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("using in operation in aggregation", helpers.Args{Input: "in",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("using not-in operation in aggregation", helpers.Args{Input:
"not_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("max top3 with version merged order by desc",
helpers.Args{Input: "aggr_version_merged", Duration: 25 * time.Minute, Offset:
-20 * time.Minute}),
)