This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new b3d1f2f7 Refactor TopN Aggregation Result Query Process (#937)
b3d1f2f7 is described below

commit b3d1f2f7c174d67ece9cc6c8583cfbff8e84f540
Author: peachisai <[email protected]>
AuthorDate: Mon Jan 19 09:05:17 2026 +0800

    Refactor TopN Aggregation Result Query Process (#937)
    
    * Refactor TopN Aggregation Result Query Process
---
 api/proto/banyandb/measure/v1/topn.proto           |   2 +
 banyand/dquery/topn.go                             |  30 +-
 banyand/measure/block.go                           | 111 ++++++-
 banyand/measure/query.go                           |  24 +-
 banyand/measure/topn.go                            |  25 +-
 banyand/measure/topn_post_processor.go             | 344 ++++++++++++++++++++
 banyand/measure/topn_post_processor_test.go        | 157 +++++++++
 banyand/query/processor_topn.go                    | 356 +++------------------
 pkg/query/logical/measure/topn_analyzer.go         |   2 +
 pkg/query/logical/measure/topn_plan_localscan.go   |  24 +-
 pkg/query/model/model.go                           |   2 +
 .../measures/endpoint_resp_time_minute.json        |  37 +++
 .../endpoint_resp_time_minute_top_bottom_100.json  |  17 +
 test/cases/init.go                                 |   2 +
 .../testdata/endpoint_resp_time_minute_data.json   | 102 ++++++
 .../testdata/endpoint_resp_time_minute_data1.json  | 102 ++++++
 test/cases/topn/data/input/aggr_version_merged.ql  |  22 ++
 .../cases/topn/data/input/aggr_version_merged.yaml |  22 ++
 test/cases/topn/data/want/aggr_version_merged.yaml |  43 +++
 test/cases/topn/topn.go                            |   1 +
 20 files changed, 1082 insertions(+), 343 deletions(-)

diff --git a/api/proto/banyandb/measure/v1/topn.proto 
b/api/proto/banyandb/measure/v1/topn.proto
index 48afc024..712932de 100644
--- a/api/proto/banyandb/measure/v1/topn.proto
+++ b/api/proto/banyandb/measure/v1/topn.proto
@@ -35,6 +35,8 @@ message TopNList {
   message Item {
     repeated model.v1.Tag entity = 1;
     model.v1.FieldValue value = 2;
+    int64 version = 3;
+    google.protobuf.Timestamp timestamp = 4;
   }
   // items contains top-n items in a list
   repeated Item items = 2;
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index d96b78c2..d50290ba 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -29,7 +29,6 @@ import (
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/measure"
-       "github.com/apache/skywalking-banyandb/banyand/query"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/iter/sort"
@@ -55,12 +54,8 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
        }
        n := time.Now()
        now := bus.MessageID(request.TimeRange.Begin.Nanos)
-       if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
-               resp = bus.NewMessage(now, common.NewError("unspecified 
requested sort direction"))
-               return
-       }
-       if request.GetAgg() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
-               resp = bus.NewMessage(now, common.NewError("unspecified 
requested aggregation function"))
+       if err := t.validateRequest(request); err != nil {
+               resp = bus.NewMessage(now, common.NewError("%s", err.Error()))
                return
        }
        if e := t.log.Debug(); e.Enabled() {
@@ -116,7 +111,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                return
        }
        var allErr error
-       aggregator := query.CreateTopNPostAggregator(request.GetTopN(),
+       aggregator := measure.CreateTopNPostProcessor(request.GetTopN(),
                agg, request.GetFieldValueSort())
        var tags []string
        var responseCount int
@@ -142,7 +137,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                                        for _, e := range tn.Entity {
                                                entityValues = 
append(entityValues, e.Value)
                                        }
-                                       _ = aggregator.Put(entityValues, 
tn.Value.GetInt().GetValue(), uint64(l.Timestamp.AsTime().UnixMilli()))
+                                       aggregator.Put(entityValues, 
tn.Value.GetInt().GetValue(), uint64(tn.Timestamp.AsTime().UnixMilli()), 
tn.Version)
                                }
                        }
                }
@@ -158,7 +153,12 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                resp = bus.NewMessage(now, &measurev1.TopNResponse{})
                return
        }
-       lists := aggregator.Val(tags)
+       lists, err := aggregator.Val(tags)
+       if err != nil {
+               resp = bus.NewMessage(now, common.NewError("failed to 
post-aggregate %s: %v", request.GetName(), err))
+               return
+       }
+
        if span != nil {
                span.Tagf("list_count", "%d", len(lists))
        }
@@ -206,3 +206,13 @@ func (s *sortedTopNList) Next() bool {
 func (s *sortedTopNList) Val() *comparableTopNItem {
        return &comparableTopNItem{s.Items[s.index-1]}
 }
+
+func (t *topNQueryProcessor) validateRequest(request *measurev1.TopNRequest) 
error {
+       if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
+               return errors.New("unspecified requested sort direction")
+       }
+       if request.GetAgg() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+               return errors.New("unspecified requested aggregation function")
+       }
+       return nil
+}
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 59f50241..24c9cd48 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -666,9 +666,118 @@ func (bc *blockCursor) copyTo(r *model.MeasureResult, 
storedIndexValue map[commo
        }
 }
 
+func (bc *blockCursor) mergeTopNResult(r *model.MeasureResult, 
storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue,
+       topNPostAggregator PostProcessor,
+) {
+       r.SID = bc.bm.seriesID
+       var indexValue map[string]*modelv1.TagValue
+       if storedIndexValue != nil {
+               indexValue = storedIndexValue[r.SID]
+       }
+       for i := range r.TagFamilies {
+               tfName := r.TagFamilies[i].Name
+               var cf *columnFamily
+               for j := range r.TagFamilies[i].Tags {
+                       tagName := r.TagFamilies[i].Tags[j].Name
+                       if indexValue != nil && indexValue[tagName] != nil {
+                               
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = 
indexValue[tagName]
+                               continue
+                       }
+                       if cf == nil {
+                               for i := range bc.tagFamilies {
+                                       if bc.tagFamilies[i].name == tfName {
+                                               cf = &bc.tagFamilies[i]
+                                               break
+                                       }
+                               }
+                       }
+                       for _, c := range cf.columns {
+                               if c.name == tagName {
+                                       schemaType, hasSchemaType := 
bc.schemaTagTypes[tagName]
+                                       if hasSchemaType && c.valueType == 
schemaType {
+                                               
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = 
mustDecodeTagValue(c.valueType, c.values[bc.idx])
+                                       } else {
+                                               
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = 
pbv1.NullTagValue
+                                       }
+                                       break
+                               }
+                       }
+               }
+       }
+
+       topNValue := GenerateTopNValue()
+       defer ReleaseTopNValue(topNValue)
+       decoder := GenerateTopNValuesDecoder()
+       defer ReleaseTopNValuesDecoder(decoder)
+
+       uTimestamps := uint64(bc.timestamps[bc.idx])
+
+       for i, c := range bc.fields.columns {
+               srcFieldValue := r.Fields[i].Values[len(r.Fields[i].Values)-1]
+               destFieldValue := mustDecodeFieldValue(c.valueType, 
c.values[bc.idx])
+
+               topNValue.Reset()
+
+               if err := topNValue.Unmarshal(srcFieldValue.GetBinaryData(), 
decoder); err != nil {
+                       log.Error().Err(err).Msg("failed to unmarshal topN 
value, skip current batch")
+                       continue
+               }
+
+               valueName := topNValue.valueName
+               entityTagNames := topNValue.entityTagNames
+
+               for j, entityList := range topNValue.entities {
+                       entityValues := make(pbv1.EntityValues, 0, 
len(entityList))
+                       for _, e := range entityList {
+                               entityValues = append(entityValues, e)
+                       }
+                       topNPostAggregator.Put(entityValues, 
topNValue.values[j], uTimestamps, r.Versions[len(r.Versions)-1])
+               }
+
+               topNValue.Reset()
+               if err := topNValue.Unmarshal(destFieldValue.GetBinaryData(), 
decoder); err != nil {
+                       log.Error().Err(err).Msg("failed to unmarshal topN 
value, skip current batch")
+                       continue
+               }
+
+               for j, entityList := range topNValue.entities {
+                       entityValues := make(pbv1.EntityValues, 0, 
len(entityList))
+                       for _, e := range entityList {
+                               entityValues = append(entityValues, e)
+                       }
+                       topNPostAggregator.Put(entityValues, 
topNValue.values[j], uTimestamps, bc.versions[bc.idx])
+               }
+
+               items, err := topNPostAggregator.Flush()
+               if err != nil {
+                       log.Error().Err(err).Msg("failed to flush aggregator, 
skip current batch")
+                       continue
+               }
+
+               topNValue.Reset()
+               topNValue.setMetadata(valueName, entityTagNames)
+
+               for _, item := range items {
+                       topNValue.addValue(item.val, item.values)
+               }
+
+               buf, err := topNValue.marshal(make([]byte, 0, 128))
+               if err != nil {
+                       log.Error().Err(err).Msg("failed to marshal topN value, 
skip current batch")
+                       continue
+               }
+
+               r.Fields[i].Values[len(r.Fields[i].Values)-1] = 
&modelv1.FieldValue{
+                       Value: &modelv1.FieldValue_BinaryData{
+                               BinaryData: buf,
+                       },
+               }
+               r.Versions[len(r.Versions)-1] = bc.versions[bc.idx]
+       }
+}
+
 func (bc *blockCursor) replace(r *model.MeasureResult, storedIndexValue 
map[common.SeriesID]map[string]*modelv1.TagValue) {
        r.SID = bc.bm.seriesID
-       r.Timestamps[len(r.Timestamps)-1] = bc.timestamps[bc.idx]
        r.Versions[len(r.Versions)-1] = bc.versions[bc.idx]
        var indexValue map[string]*modelv1.TagValue
        if storedIndexValue != nil {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 426c5963..22fa8aa4 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -71,6 +71,11 @@ type queryOptions struct {
        maxTimestamp int64
 }
 
+type topNQueryOptions struct {
+       sortDirection modelv1.Sort
+       number        int32
+}
+
 func (m *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) 
(mqr model.MeasureQueryResult, err error) {
        if mqo.TimeRange == nil {
                return nil, errors.New("invalid query options: timeRange are 
required")
@@ -190,6 +195,13 @@ func (m *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
                }
        }
 
+       if mqo.Name == "_top_n_result" {
+               result.topNQueryOptions = &topNQueryOptions{
+                       sortDirection: mqo.Sort,
+                       number:        mqo.Number,
+               }
+       }
+
        return &result, nil
 }
 
@@ -675,6 +687,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue 
{
 
 type queryResult struct {
        ctx              context.Context
+       topNQueryOptions *topNQueryOptions
        sidToIndex       map[common.SeriesID]int
        storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue
        tagProjection    []model.TagProjection
@@ -842,6 +855,13 @@ func (qr *queryResult) merge(storedIndexValue 
map[common.SeriesID]map[string]*mo
        var lastVersion int64
        var lastSid common.SeriesID
 
+       var topNPostAggregator PostProcessor
+
+       if qr.topNQueryOptions != nil {
+               topNPostAggregator = 
CreateTopNPostProcessor(qr.topNQueryOptions.number, 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED,
+                       qr.topNQueryOptions.sortDirection)
+       }
+
        for qr.Len() > 0 {
                topBC := qr.data[0]
                if lastSid != 0 && topBC.bm.seriesID != lastSid {
@@ -851,7 +871,9 @@ func (qr *queryResult) merge(storedIndexValue 
map[common.SeriesID]map[string]*mo
 
                if len(result.Timestamps) > 0 &&
                        topBC.timestamps[topBC.idx] == 
result.Timestamps[len(result.Timestamps)-1] {
-                       if topBC.versions[topBC.idx] > lastVersion {
+                       if topNPostAggregator != nil {
+                               topBC.mergeTopNResult(result, storedIndexValue, 
topNPostAggregator)
+                       } else if topBC.versions[topBC.idx] > lastVersion {
                                topBC.replace(result, storedIndexValue)
                        }
                } else {
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index d595b105..3c042017 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -386,13 +386,6 @@ func (t *topNStreamingProcessor) writeStreamRecord(record 
flow.StreamRecord, buf
                                        },
                                },
                        },
-                       {
-                               Value: &modelv1.TagValue_Str{
-                                       Str: &modelv1.Str{
-                                               Value: t.nodeID,
-                                       },
-                               },
-                       },
                }
                buf = buf[:0]
                if buf, err = topNValue.marshal(buf); err != nil {
@@ -414,6 +407,7 @@ func (t *topNStreamingProcessor) writeStreamRecord(record 
flow.StreamRecord, buf
                                                        },
                                                },
                                        },
+                                       Version: time.Now().UnixNano(),
                                },
                        },
                        EntityValues: entityValues,
@@ -953,3 +947,20 @@ func (t *TopNValue) Unmarshal(src []byte, decoder 
*encoding.BytesBlockDecoder) e
 func GroupName(groupTags []string) string {
        return strings.Join(groupTags, "|")
 }
+
+// GenerateTopNValuesDecoder returns a new decoder instance of TopNValues.
+func GenerateTopNValuesDecoder() *encoding.BytesBlockDecoder {
+       v := topNValuesDecoderPool.Get()
+       if v == nil {
+               return &encoding.BytesBlockDecoder{}
+       }
+       return v
+}
+
+// ReleaseTopNValuesDecoder releases a decoder instance of TopNValues.
+func ReleaseTopNValuesDecoder(d *encoding.BytesBlockDecoder) {
+       d.Reset()
+       topNValuesDecoderPool.Put(d)
+}
+
+var topNValuesDecoderPool = 
pool.Register[*encoding.BytesBlockDecoder]("topn-valueDecoder")
diff --git a/banyand/measure/topn_post_processor.go 
b/banyand/measure/topn_post_processor.go
new file mode 100644
index 00000000..fde0f5a4
--- /dev/null
+++ b/banyand/measure/topn_post_processor.go
@@ -0,0 +1,344 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "container/heap"
+       "slices"
+       "time"
+
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/flow"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/aggregation"
+)
+
+// PostProcessor defines necessary methods for Top-N post processor with or 
without aggregation.
+type PostProcessor interface {
+       Put(entityValues pbv1.EntityValues, val int64, timestampMillis uint64, 
version int64)
+       Flush() ([]*topNAggregatorItem, error)
+       Val([]string) ([]*measurev1.TopNList, error)
+}
+
+// CreateTopNPostProcessor creates a Top-N post processor with or without 
aggregation.
+func CreateTopNPostProcessor(topN int32, aggrFunc modelv1.AggregationFunction, 
sort modelv1.Sort) PostProcessor {
+       if aggrFunc == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+               // if aggregation is not specified, we have to keep all 
timelines
+               return &topNPostProcessor{
+                       topN:      topN,
+                       sort:      sort,
+                       timelines: make(map[uint64]*topNTimelineItem),
+               }
+       }
+       aggregator := &topNPostProcessor{
+               topN:      topN,
+               sort:      sort,
+               aggrFunc:  aggrFunc,
+               cache:     make(map[string]*topNAggregatorItem),
+               timelines: make(map[uint64]*topNTimelineItem),
+               items:     make([]*topNAggregatorItem, 0, topN),
+       }
+       heap.Init(aggregator)
+       return aggregator
+}
+
+func (taggr *topNPostProcessor) Len() int {
+       return len(taggr.items)
+}
+
+// Less reports whether min/max heap has to be built.
+// For DESC, a min heap has to be built,
+// while for ASC, a max heap has to be built.
+func (taggr *topNPostProcessor) Less(i, j int) bool {
+       if taggr.sort == modelv1.Sort_SORT_DESC {
+               return taggr.items[i].int64Func.Val() < 
taggr.items[j].int64Func.Val()
+       }
+       return taggr.items[i].int64Func.Val() > taggr.items[j].int64Func.Val()
+}
+
+func (taggr *topNPostProcessor) Swap(i, j int) {
+       taggr.items[i], taggr.items[j] = taggr.items[j], taggr.items[i]
+       taggr.items[i].index = i
+       taggr.items[j].index = j
+}
+
+func (taggr *topNPostProcessor) Push(x any) {
+       n := len(taggr.items)
+       item := x.(*topNAggregatorItem)
+       item.index = n
+       taggr.items = append(taggr.items, item)
+}
+
+func (taggr *topNPostProcessor) Pop() any {
+       old := taggr.items
+       n := len(old)
+       item := old[n-1]
+       old[n-1] = nil
+       item.index = -1
+       taggr.items = old[0 : n-1]
+       return item
+}
+
+func (taggr *topNPostProcessor) tryEnqueue(key string, item 
*topNAggregatorItem) {
+       if lowest := taggr.items[0]; lowest != nil {
+               shouldReplace := (taggr.sort == modelv1.Sort_SORT_DESC && 
lowest.int64Func.Val() < item.int64Func.Val()) ||
+                       (taggr.sort != modelv1.Sort_SORT_DESC && 
lowest.int64Func.Val() > item.int64Func.Val())
+
+               if shouldReplace {
+                       delete(taggr.cache, lowest.key)
+                       taggr.cache[key] = item
+                       taggr.items[0] = item
+                       item.index = 0
+                       heap.Fix(taggr, 0)
+               }
+       }
+}
+
+var _ flow.Element = (*topNAggregatorItem)(nil)
+
+type topNAggregatorItem struct {
+       int64Func aggregation.Func[int64]
+       key       string
+       values    pbv1.EntityValues
+       val       int64
+       version   int64
+       index     int
+}
+
+func (n *topNAggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
+       tags := make([]*modelv1.Tag, len(n.values))
+       for i := 0; i < len(tags); i++ {
+               tags[i] = &modelv1.Tag{
+                       Key:   tagNames[i],
+                       Value: n.values[i],
+               }
+       }
+       return tags
+}
+
+func (n *topNAggregatorItem) GetIndex() int {
+       return n.index
+}
+
+func (n *topNAggregatorItem) SetIndex(i int) {
+       n.index = i
+}
+
+type topNTimelineItem struct {
+       queue *flow.DedupPriorityQueue
+       items map[string]*topNAggregatorItem
+}
+
+type topNPostProcessor struct {
+       cache     map[string]*topNAggregatorItem
+       timelines map[uint64]*topNTimelineItem
+       items     []*topNAggregatorItem
+       sort      modelv1.Sort
+       aggrFunc  modelv1.AggregationFunction
+       topN      int32
+}
+
+func (taggr *topNPostProcessor) Put(entityValues pbv1.EntityValues, val int64, 
timestampMillis uint64, version int64) {
+       timeline, ok := taggr.timelines[timestampMillis]
+       key := entityValues.String()
+       if !ok {
+               timeline = &topNTimelineItem{
+                       queue: flow.NewPriorityQueue(func(a, b interface{}) int 
{
+                               if taggr.sort == modelv1.Sort_SORT_DESC {
+                                       if a.(*topNAggregatorItem).val < 
b.(*topNAggregatorItem).val {
+                                               return -1
+                                       } else if a.(*topNAggregatorItem).val 
== b.(*topNAggregatorItem).val {
+                                               return 0
+                                       }
+                                       return 1
+                               }
+                               if a.(*topNAggregatorItem).val < 
b.(*topNAggregatorItem).val {
+                                       return 1
+                               } else if a.(*topNAggregatorItem).val == 
b.(*topNAggregatorItem).val {
+                                       return 0
+                               }
+                               return -1
+                       }, false),
+                       items: make(map[string]*topNAggregatorItem),
+               }
+
+               newItem := &topNAggregatorItem{
+                       val:     val,
+                       key:     key,
+                       values:  entityValues,
+                       version: version,
+               }
+
+               timeline.items[key] = newItem
+               heap.Push(timeline.queue, newItem)
+               taggr.timelines[timestampMillis] = timeline
+               return
+       }
+
+       if item, exist := timeline.items[key]; exist {
+               if version >= item.version {
+                       item.val = val
+                       item.version = version
+                       heap.Fix(timeline.queue, item.index)
+               }
+
+               return
+       }
+
+       newItem := &topNAggregatorItem{
+               val:     val,
+               key:     key,
+               values:  entityValues,
+               version: version,
+       }
+
+       if timeline.queue.Len() < int(taggr.topN) {
+               heap.Push(timeline.queue, newItem)
+               timeline.items[key] = newItem
+               return
+       }
+
+       if lowest := timeline.queue.Peek(); lowest != nil {
+               lowestItem := lowest.(*topNAggregatorItem)
+
+               shouldReplace := (taggr.sort == modelv1.Sort_SORT_DESC && 
lowestItem.val < val) ||
+                       (taggr.sort != modelv1.Sort_SORT_DESC && lowestItem.val 
> val)
+
+               if shouldReplace {
+                       delete(timeline.items, lowestItem.key)
+                       timeline.items[key] = newItem
+                       timeline.queue.ReplaceLowest(newItem)
+               }
+       }
+}
+
+func (taggr *topNPostProcessor) Flush() ([]*topNAggregatorItem, error) {
+       var result []*topNAggregatorItem
+
+       if taggr.aggrFunc == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+               for _, timeline := range taggr.timelines {
+                       for timeline.queue.Len() > 0 {
+                               item := 
heap.Pop(timeline.queue).(*topNAggregatorItem)
+                               result = append(result, item)
+                       }
+               }
+               clear(taggr.timelines)
+       } else {
+               for _, timeline := range taggr.timelines {
+                       for _, item := range timeline.items {
+                               if exist, found := taggr.cache[item.key]; found 
{
+                                       exist.int64Func.In(item.val)
+                                       heap.Fix(taggr, exist.index)
+                                       continue
+                               }
+
+                               aggrFunc, err := 
aggregation.NewFunc[int64](taggr.aggrFunc)
+                               if err != nil {
+                                       return nil, err
+                               }
+
+                               item.int64Func = aggrFunc
+                               item.int64Func.In(item.val)
+
+                               if taggr.Len() < int(taggr.topN) {
+                                       taggr.cache[item.key] = item
+                                       heap.Push(taggr, item)
+                               } else {
+                                       taggr.tryEnqueue(item.key, item)
+                               }
+                       }
+               }
+               result = make([]*topNAggregatorItem, 0, taggr.Len())
+               for taggr.Len() > 0 {
+                       item := heap.Pop(taggr).(*topNAggregatorItem)
+                       result = append(result, item)
+               }
+       }
+
+       return result, nil
+}
+
+func (taggr *topNPostProcessor) Val(tagNames []string) ([]*measurev1.TopNList, 
error) {
+       if taggr.aggrFunc != 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+               return taggr.valWithAggregation(tagNames)
+       }
+
+       return taggr.valWithoutAggregation(tagNames), nil
+}
+
+func (taggr *topNPostProcessor) valWithAggregation(tagNames []string) 
([]*measurev1.TopNList, error) {
+       topNAggregatorItems, err := taggr.Flush()
+       if err != nil {
+               return nil, err
+       }
+       length := len(topNAggregatorItems)
+       items := make([]*measurev1.TopNList_Item, length)
+
+       for i, item := range topNAggregatorItems {
+               targetIdx := length - 1 - i
+
+               items[targetIdx] = &measurev1.TopNList_Item{
+                       Entity: item.GetTags(tagNames),
+                       Value: &modelv1.FieldValue{
+                               Value: &modelv1.FieldValue_Int{
+                                       Int: &modelv1.Int{Value: 
item.int64Func.Val()},
+                               },
+                       },
+               }
+       }
+       return []*measurev1.TopNList{
+               {
+                       Timestamp: timestamppb.Now(),
+                       Items:     items,
+               },
+       }, nil
+}
+
+func (taggr *topNPostProcessor) valWithoutAggregation(tagNames []string) 
[]*measurev1.TopNList {
+       topNLists := make([]*measurev1.TopNList, 0, len(taggr.timelines))
+       for ts, timeline := range taggr.timelines {
+               items := make([]*measurev1.TopNList_Item, timeline.queue.Len())
+               for idx, elem := range timeline.queue.Values() {
+                       items[idx] = &measurev1.TopNList_Item{
+                               Entity: 
elem.(*topNAggregatorItem).GetTags(tagNames),
+                               Value: &modelv1.FieldValue{
+                                       Value: &modelv1.FieldValue_Int{
+                                               Int: &modelv1.Int{Value: 
elem.(*topNAggregatorItem).val},
+                                       },
+                               },
+                       }
+               }
+               topNLists = append(topNLists, &measurev1.TopNList{
+                       Timestamp: timestamppb.New(time.Unix(0, int64(ts))),
+                       Items:     items,
+               })
+       }
+
+       slices.SortStableFunc(topNLists, func(a, b *measurev1.TopNList) int {
+               r := int(a.GetTimestamp().GetSeconds() - 
b.GetTimestamp().GetSeconds())
+               if r != 0 {
+                       return r
+               }
+               return int(a.GetTimestamp().GetNanos() - 
b.GetTimestamp().GetNanos())
+       })
+
+       return topNLists
+}
diff --git a/banyand/measure/topn_post_processor_test.go 
b/banyand/measure/topn_post_processor_test.go
new file mode 100644
index 00000000..95665cd0
--- /dev/null
+++ b/banyand/measure/topn_post_processor_test.go
@@ -0,0 +1,157 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "testing"
+
+       "github.com/google/go-cmp/cmp"
+       "github.com/stretchr/testify/require"
+       "google.golang.org/protobuf/testing/protocmp"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+)
+
+func TestBlockCursor_MergeTopNResult(t *testing.T) {
+       tests := []struct {
+               srcTopNVal  *TopNValue
+               destTopNVal *TopNValue
+               wantTopNVal *TopNValue
+               name        string
+               sort        modelv1.Sort
+               topN        int32
+       }{
+               {
+                       name: "Test block merge TopN result",
+                       srcTopNVal: &TopNValue{
+                               valueName:      "value",
+                               entityTagNames: []string{"entity_id"},
+                               values:         []int64{1000, 200, 300, 400, 
500},
+                               entities: [][]*modelv1.TagValue{
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_1"}}}},
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_2"}}}},
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_3"}}}},
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_4"}}}},
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_5"}}}},
+                               },
+                       },
+                       destTopNVal: &TopNValue{
+                               valueName:      "value",
+                               entityTagNames: []string{"entity_id"},
+                               values:         []int64{550, 200, 500, 600, 
400},
+                               entities: [][]*modelv1.TagValue{
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_3"}}}},
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_4"}}}},
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_5"}}}},
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_6"}}}},
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_7"}}}},
+                               },
+                       },
+                       sort: modelv1.Sort_SORT_DESC,
+                       topN: 3,
+                       wantTopNVal: &TopNValue{
+                               valueName:      "value",
+                               entityTagNames: []string{"entity_id"},
+                               values:         []int64{550, 600, 1000},
+                               entities: [][]*modelv1.TagValue{
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_3"}}}},
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_6"}}}},
+                                       {{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "entity_1"}}}},
+                               },
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       srcBuf, err := tt.srcTopNVal.marshal(make([]byte, 0, 
256))
+                       require.NoError(t, err)
+
+                       destBuf, err := tt.destTopNVal.marshal(make([]byte, 0, 
256))
+                       require.NoError(t, err)
+
+                       bc := &blockCursor{
+                               bm:          blockMetadata{seriesID: 1},
+                               tagFamilies: nil,
+                               fields: columnFamily{
+                                       columns: []column{
+                                               {name: "value", valueType: 
pbv1.ValueTypeBinaryData, values: [][]byte{destBuf}},
+                                       },
+                               },
+                               timestamps:     []int64{1},
+                               versions:       []int64{2},
+                               idx:            0,
+                               schemaTagTypes: map[string]pbv1.ValueType{},
+                       }
+
+                       result := &model.MeasureResult{
+                               Timestamps: []int64{1},
+                               Versions:   []int64{1},
+                               TagFamilies: []model.TagFamily{
+                                       {Name: "_topN", Tags: []model.Tag{
+                                               {Name: "name", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
+                                       }},
+                               },
+                               Fields: []model.Field{
+                                       {
+                                               Name:   "value",
+                                               Values: 
[]*modelv1.FieldValue{{Value: &modelv1.FieldValue_BinaryData{BinaryData: 
srcBuf}}},
+                                       },
+                               },
+                       }
+
+                       topNPostAggregator := CreateTopNPostProcessor(tt.topN, 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED, tt.sort)
+
+                       expectedTagValue := "endpoint_resp_time-service"
+                       storedIndexValue := 
map[common.SeriesID]map[string]*modelv1.TagValue{
+                               1: {
+                                       "name": {
+                                               Value: &modelv1.TagValue_Str{
+                                                       Str: 
&modelv1.Str{Value: expectedTagValue},
+                                               },
+                                       },
+                               },
+                       }
+
+                       bc.mergeTopNResult(result, storedIndexValue, 
topNPostAggregator)
+
+                       tagValue := result.TagFamilies[0].Tags[0].Values[0]
+                       require.Equal(t, expectedTagValue, 
tagValue.GetStr().GetValue())
+
+                       mergedFieldValue := result.Fields[0].Values[0]
+                       require.NotNil(t, mergedFieldValue.GetBinaryData())
+
+                       gotTopNVal := GenerateTopNValue()
+                       defer ReleaseTopNValue(gotTopNVal)
+                       decoder := GenerateTopNValuesDecoder()
+                       defer ReleaseTopNValuesDecoder(decoder)
+
+                       err = 
gotTopNVal.Unmarshal(mergedFieldValue.GetBinaryData(), decoder)
+                       require.NoError(t, err)
+
+                       require.Equal(t, tt.wantTopNVal.valueName, 
gotTopNVal.valueName)
+                       require.Equal(t, tt.wantTopNVal.entityTagNames, 
gotTopNVal.entityTagNames)
+                       require.Equal(t, tt.wantTopNVal.values, 
gotTopNVal.values)
+                       diff := cmp.Diff(tt.wantTopNVal.entities, 
gotTopNVal.entities, protocmp.Transform())
+                       require.Empty(t, diff, "entities differ: %s", diff)
+               })
+       }
+}
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index f3246c1f..90a49bbc 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -18,16 +18,12 @@
 package query
 
 import (
-       "container/heap"
        "context"
        "errors"
        "fmt"
-       "slices"
        "strings"
        "time"
 
-       "google.golang.org/protobuf/types/known/timestamppb"
-
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -35,11 +31,8 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/pkg/bus"
-       "github.com/apache/skywalking-banyandb/pkg/flow"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query"
-       "github.com/apache/skywalking-banyandb/pkg/query/aggregation"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        logical_measure 
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
 )
@@ -50,29 +43,18 @@ type topNQueryProcessor struct {
        *bus.UnImplementedHealthyListener
 }
 
-func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) 
(resp bus.Message) {
-       request, ok := message.Data().(*measurev1.TopNRequest)
-       n := time.Now()
-       now := n.UnixNano()
-       if !ok {
-               t.log.Warn().Msg("invalid event data type")
-               return
-       }
-       ml := t.log.Named("topn", strings.Join(request.Groups, ","), 
request.Name)
-       if e := ml.Debug(); e.Enabled() {
-               e.RawJSON("req", logger.Proto(request)).Msg("received a topn 
event for groups: " + strings.Join(request.Groups, ","))
-       }
-       if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
-               t.log.Warn().Msg("invalid requested sort direction")
-               return
-       }
-       if e := t.log.Debug(); e.Enabled() {
-               e.Stringer("req", request).Msg("received a topN query event")
+type topNQueryContext struct {
+       sourceMeasureSchemas []*databasev1.Measure
+       topNSchemas          []*databasev1.TopNAggregation
+       ecc                  []executor.MeasureExecutionContext
+}
+
+func (t *topNQueryProcessor) prepareGroupsContext(ctx context.Context, request 
*measurev1.TopNRequest, ml *logger.Logger) (*topNQueryContext, error) {
+       qc := &topNQueryContext{
+               sourceMeasureSchemas: make([]*databasev1.Measure, 0, 
len(request.Groups)),
+               topNSchemas:          make([]*databasev1.TopNAggregation, 0, 
len(request.Groups)),
+               ecc:                  make([]executor.MeasureExecutionContext, 
0, len(request.Groups)),
        }
-       // Process all groups
-       var sourceMeasureSchemas []*databasev1.Measure
-       var topNSchemas []*databasev1.TopNAggregation
-       var ecc []executor.MeasureExecutionContext
 
        for _, group := range request.Groups {
                topNMetadata := &commonv1.Metadata{
@@ -84,32 +66,60 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                        t.log.Error().Err(err).
                                Str("group", group).
                                Msg("fail to get execution context")
-                       return
+                       return nil, err
                }
                if topNSchema.GetFieldValueSort() != 
modelv1.Sort_SORT_UNSPECIFIED &&
                        topNSchema.GetFieldValueSort() != 
request.GetFieldValueSort() {
                        t.log.Warn().Str("group", group).Msg("unmatched sort 
direction")
-                       return
+                       return nil, err
                }
                sourceMeasure, err := 
t.measureService.Measure(topNSchema.GetSourceMeasure())
                if err != nil {
                        t.log.Error().Err(err).
                                Str("group", group).
                                Msg("fail to find source measure")
-                       return
+                       return nil, err
                }
                topNResultMeasure, err := 
t.measureService.Measure(measure.GetTopNSchemaMetadata(group))
                if err != nil {
                        ml.Error().Err(err).Str("group", group).Msg("fail to 
find topn result measure")
-                       return
+                       return nil, err
                }
 
-               sourceMeasureSchemas = append(sourceMeasureSchemas, 
sourceMeasure.GetSchema())
-               topNSchemas = append(topNSchemas, topNSchema)
-               ecc = append(ecc, topNResultMeasure)
+               qc.sourceMeasureSchemas = append(qc.sourceMeasureSchemas, 
sourceMeasure.GetSchema())
+               qc.topNSchemas = append(qc.topNSchemas, topNSchema)
+               qc.ecc = append(qc.ecc, topNResultMeasure)
+       }
+
+       return qc, nil
+}
+
+func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) 
(resp bus.Message) {
+       request, ok := message.Data().(*measurev1.TopNRequest)
+       n := time.Now()
+       now := n.UnixNano()
+       if !ok {
+               t.log.Warn().Msg("invalid event data type")
+               return
+       }
+       ml := t.log.Named("topn", strings.Join(request.Groups, ","), 
request.Name)
+       if e := ml.Debug(); e.Enabled() {
+               e.RawJSON("req", logger.Proto(request)).Msg("received a topn 
event for groups: " + strings.Join(request.Groups, ","))
+       }
+       if request.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
+               t.log.Warn().Msg("invalid requested sort direction")
+               return
+       }
+       if e := t.log.Debug(); e.Enabled() {
+               e.Stringer("req", request).Msg("received a topN query event")
+       }
+       // Process all groups
+       qc, err := t.prepareGroupsContext(ctx, request, ml)
+       if err != nil {
+               return
        }
 
-       plan, err := logical_measure.TopNAnalyze(request, sourceMeasureSchemas, 
topNSchemas, ecc)
+       plan, err := logical_measure.TopNAnalyze(request, 
qc.sourceMeasureSchemas, qc.topNSchemas, qc.ecc)
        if err != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for topn %s: %v", request.Name, err))
                return
@@ -186,8 +196,10 @@ func toTopNResponse(dps []*measurev1.DataPoint) 
*measurev1.TopNResponse {
        topNItems := make([]*measurev1.TopNList_Item, len(dps))
        for i, dp := range dps {
                topNItems[i] = &measurev1.TopNList_Item{
-                       Entity: dp.GetTagFamilies()[0].GetTags(),
-                       Value:  dp.GetFields()[0].GetValue(),
+                       Entity:    dp.GetTagFamilies()[0].GetTags(),
+                       Value:     dp.GetFields()[0].GetValue(),
+                       Version:   dp.GetVersion(),
+                       Timestamp: dp.GetTimestamp(),
                }
        }
        topNList = append(topNList, &measurev1.TopNList{
@@ -195,269 +207,3 @@ func toTopNResponse(dps []*measurev1.DataPoint) 
*measurev1.TopNResponse {
        })
        return &measurev1.TopNResponse{Lists: topNList}
 }
-
-var _ heap.Interface = (*postAggregationProcessor)(nil)
-
-type aggregatorItem struct {
-       int64Func aggregation.Func[int64]
-       key       string
-       values    pbv1.EntityValues
-       index     int
-}
-
-func (n *aggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
-       tags := make([]*modelv1.Tag, len(n.values))
-       for i := 0; i < len(tags); i++ {
-               tags[i] = &modelv1.Tag{
-                       Key:   tagNames[i],
-                       Value: n.values[i],
-               }
-       }
-       return tags
-}
-
-// PostProcessor defines necessary methods for Top-N post processor with or 
without aggregation.
-type PostProcessor interface {
-       Put(entityValues pbv1.EntityValues, val int64, timestampMillis uint64) 
error
-       Val([]string) []*measurev1.TopNList
-}
-
-// CreateTopNPostAggregator creates a Top-N post processor with or without 
aggregation.
-func CreateTopNPostAggregator(topN int32, aggrFunc 
modelv1.AggregationFunction, sort modelv1.Sort) PostProcessor {
-       if aggrFunc == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
-               // if aggregation is not specified, we have to keep all 
timelines
-               return &postNonAggregationProcessor{
-                       topN:      topN,
-                       sort:      sort,
-                       timelines: make(map[uint64]*flow.DedupPriorityQueue),
-               }
-       }
-       aggregator := &postAggregationProcessor{
-               topN:     topN,
-               sort:     sort,
-               aggrFunc: aggrFunc,
-               cache:    make(map[string]*aggregatorItem),
-               items:    make([]*aggregatorItem, 0, topN),
-       }
-       heap.Init(aggregator)
-       return aggregator
-}
-
-// postAggregationProcessor is an implementation of postProcessor with 
aggregation.
-type postAggregationProcessor struct {
-       cache           map[string]*aggregatorItem
-       items           []*aggregatorItem
-       latestTimestamp uint64
-       topN            int32
-       sort            modelv1.Sort
-       aggrFunc        modelv1.AggregationFunction
-}
-
-func (aggr postAggregationProcessor) Len() int {
-       return len(aggr.items)
-}
-
-// Less reports whether min/max heap has to be built.
-// For DESC, a min heap has to be built,
-// while for ASC, a max heap has to be built.
-func (aggr postAggregationProcessor) Less(i, j int) bool {
-       if aggr.sort == modelv1.Sort_SORT_DESC {
-               return aggr.items[i].int64Func.Val() < 
aggr.items[j].int64Func.Val()
-       }
-       return aggr.items[i].int64Func.Val() > aggr.items[j].int64Func.Val()
-}
-
-func (aggr *postAggregationProcessor) Swap(i, j int) {
-       aggr.items[i], aggr.items[j] = aggr.items[j], aggr.items[i]
-       aggr.items[i].index = i
-       aggr.items[j].index = j
-}
-
-func (aggr *postAggregationProcessor) Push(x any) {
-       n := len(aggr.items)
-       item := x.(*aggregatorItem)
-       item.index = n
-       aggr.items = append(aggr.items, item)
-}
-
-func (aggr *postAggregationProcessor) Pop() any {
-       old := aggr.items
-       n := len(old)
-       item := old[n-1]
-       old[n-1] = nil
-       item.index = -1
-       aggr.items = old[0 : n-1]
-       return item
-}
-
-func (aggr *postAggregationProcessor) Put(entityValues pbv1.EntityValues, val 
int64, timestampMillis uint64) error {
-       // update latest ts
-       if aggr.latestTimestamp < timestampMillis {
-               aggr.latestTimestamp = timestampMillis
-       }
-       key := entityValues.String()
-       if item, found := aggr.cache[key]; found {
-               item.int64Func.In(val)
-               return nil
-       }
-
-       aggrFunc, err := aggregation.NewFunc[int64](aggr.aggrFunc)
-       if err != nil {
-               return err
-       }
-       item := &aggregatorItem{
-               key:       key,
-               int64Func: aggrFunc,
-               values:    entityValues,
-       }
-       item.int64Func.In(val)
-
-       if aggr.Len() < int(aggr.topN) {
-               aggr.cache[key] = item
-               heap.Push(aggr, item)
-       } else {
-               aggr.tryEnqueue(key, item)
-       }
-
-       return nil
-}
-
-func (aggr *postAggregationProcessor) tryEnqueue(key string, item 
*aggregatorItem) {
-       if lowest := aggr.items[0]; lowest != nil {
-               if aggr.sort == modelv1.Sort_SORT_DESC && 
lowest.int64Func.Val() < item.int64Func.Val() {
-                       aggr.cache[key] = item
-                       aggr.items[0] = item
-                       heap.Fix(aggr, 0)
-               } else if aggr.sort != modelv1.Sort_SORT_DESC && 
lowest.int64Func.Val() > item.int64Func.Val() {
-                       aggr.cache[key] = item
-                       aggr.items[0] = item
-                       heap.Fix(aggr, 0)
-               }
-       }
-}
-
-func (aggr *postAggregationProcessor) Val(tagNames []string) 
[]*measurev1.TopNList {
-       topNItems := make([]*measurev1.TopNList_Item, aggr.Len())
-
-       for aggr.Len() > 0 {
-               item := heap.Pop(aggr).(*aggregatorItem)
-               topNItems[aggr.Len()] = &measurev1.TopNList_Item{
-                       Entity: item.GetTags(tagNames),
-                       Value: &modelv1.FieldValue{
-                               Value: &modelv1.FieldValue_Int{
-                                       Int: &modelv1.Int{Value: 
item.int64Func.Val()},
-                               },
-                       },
-               }
-       }
-       return []*measurev1.TopNList{
-               {
-                       Timestamp: timestamppb.New(time.Unix(0, 
int64(aggr.latestTimestamp))),
-                       Items:     topNItems,
-               },
-       }
-}
-
-var _ flow.Element = (*nonAggregatorItem)(nil)
-
-type nonAggregatorItem struct {
-       key    string
-       values pbv1.EntityValues
-       val    int64
-       index  int
-}
-
-func (n *nonAggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
-       tags := make([]*modelv1.Tag, len(n.values))
-       for i := 0; i < len(tags); i++ {
-               tags[i] = &modelv1.Tag{
-                       Key:   tagNames[i],
-                       Value: n.values[i],
-               }
-       }
-       return tags
-}
-
-func (n *nonAggregatorItem) GetIndex() int {
-       return n.index
-}
-
-func (n *nonAggregatorItem) SetIndex(i int) {
-       n.index = i
-}
-
-type postNonAggregationProcessor struct {
-       timelines map[uint64]*flow.DedupPriorityQueue
-       topN      int32
-       sort      modelv1.Sort
-}
-
-func (naggr *postNonAggregationProcessor) Val(tagNames []string) 
[]*measurev1.TopNList {
-       topNLists := make([]*measurev1.TopNList, 0, len(naggr.timelines))
-       for ts, timeline := range naggr.timelines {
-               items := make([]*measurev1.TopNList_Item, timeline.Len())
-               for idx, elem := range timeline.Values() {
-                       items[idx] = &measurev1.TopNList_Item{
-                               Entity: 
elem.(*nonAggregatorItem).GetTags(tagNames),
-                               Value: &modelv1.FieldValue{
-                                       Value: &modelv1.FieldValue_Int{
-                                               Int: &modelv1.Int{Value: 
elem.(*nonAggregatorItem).val},
-                                       },
-                               },
-                       }
-               }
-               topNLists = append(topNLists, &measurev1.TopNList{
-                       Timestamp: timestamppb.New(time.Unix(0, int64(ts))),
-                       Items:     items,
-               })
-       }
-
-       slices.SortStableFunc(topNLists, func(a, b *measurev1.TopNList) int {
-               r := int(a.GetTimestamp().GetSeconds() - 
b.GetTimestamp().GetSeconds())
-               if r != 0 {
-                       return r
-               }
-               return int(a.GetTimestamp().GetNanos() - 
b.GetTimestamp().GetNanos())
-       })
-
-       return topNLists
-}
-
-func (naggr *postNonAggregationProcessor) Put(entityValues pbv1.EntityValues, 
val int64, timestampMillis uint64) error {
-       key := entityValues.String()
-       if timeline, ok := naggr.timelines[timestampMillis]; ok {
-               if timeline.Len() < int(naggr.topN) {
-                       heap.Push(timeline, &nonAggregatorItem{val: val, key: 
key, values: entityValues})
-               } else {
-                       if lowest := timeline.Peek(); lowest != nil {
-                               if naggr.sort == modelv1.Sort_SORT_DESC && 
lowest.(*nonAggregatorItem).val < val {
-                                       
timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values: 
entityValues})
-                               } else if naggr.sort != modelv1.Sort_SORT_DESC 
&& lowest.(*nonAggregatorItem).val > val {
-                                       
timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values: 
entityValues})
-                               }
-                       }
-               }
-               return nil
-       }
-
-       timeline := flow.NewPriorityQueue(func(a, b interface{}) int {
-               if naggr.sort == modelv1.Sort_SORT_DESC {
-                       if a.(*nonAggregatorItem).val < 
b.(*nonAggregatorItem).val {
-                               return -1
-                       } else if a.(*nonAggregatorItem).val == 
b.(*nonAggregatorItem).val {
-                               return 0
-                       }
-                       return 1
-               }
-               if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
-                       return 1
-               } else if a.(*nonAggregatorItem).val == 
b.(*nonAggregatorItem).val {
-                       return 0
-               }
-               return -1
-       }, false)
-       naggr.timelines[timestampMillis] = timeline
-       heap.Push(timeline, &nonAggregatorItem{val: val, key: key, values: 
entityValues})
-
-       return nil
-}
diff --git a/pkg/query/logical/measure/topn_analyzer.go 
b/pkg/query/logical/measure/topn_analyzer.go
index 4ac59297..34aced93 100644
--- a/pkg/query/logical/measure/topn_analyzer.go
+++ b/pkg/query/logical/measure/topn_analyzer.go
@@ -59,6 +59,7 @@ func TopNAnalyze(criteria *measurev1.TopNRequest, 
sourceMeasureSchemaList []*dat
                        sort:        criteria.FieldValueSort,
                        groupByTags: topNAggSchema.GetGroupByTagNames(),
                        ec:          ec,
+                       number:      criteria.GetTopN(),
                }
        } else {
                subPlans := make([]*unresolvedLocalScan, 0, 
len(sourceMeasureSchemaList))
@@ -71,6 +72,7 @@ func TopNAnalyze(criteria *measurev1.TopNRequest, 
sourceMeasureSchemaList []*dat
                                sort:        criteria.FieldValueSort,
                                groupByTags: 
topNAggSchemaList[i].GetGroupByTagNames(),
                                ec:          ecc[i],
+                               number:      criteria.GetTopN(),
                        })
                }
                plan = &unresolvedTopNMerger{
diff --git a/pkg/query/logical/measure/topn_plan_localscan.go 
b/pkg/query/logical/measure/topn_plan_localscan.go
index 23907764..be85e760 100644
--- a/pkg/query/logical/measure/topn_plan_localscan.go
+++ b/pkg/query/logical/measure/topn_plan_localscan.go
@@ -31,9 +31,7 @@ import (
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/measure"
-       "github.com/apache/skywalking-banyandb/pkg/encoding"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -53,6 +51,7 @@ type unresolvedLocalScan struct {
        conditions  []*modelv1.Condition
        groupByTags []string
        sort        modelv1.Sort
+       number      int32
 }
 
 func (uls *unresolvedLocalScan) Analyze(s logical.Schema) (logical.Plan, 
error) {
@@ -103,6 +102,8 @@ func (uls *unresolvedLocalScan) Analyze(s logical.Schema) 
(logical.Plan, error)
                                },
                        },
                        FieldProjection: fieldProjection,
+                       Sort:            uls.sort,
+                       Number:          uls.number,
                },
                ec: uls.ec,
        }, nil
@@ -195,8 +196,8 @@ func (ei *topNMIterator) Next() bool {
        ei.current = ei.current[:0]
        topNValue := measure.GenerateTopNValue()
        defer measure.ReleaseTopNValue(topNValue)
-       decoder := generateTopNValuesDecoder()
-       defer releaseTopNValuesDecoder(decoder)
+       decoder := measure.GenerateTopNValuesDecoder()
+       defer measure.ReleaseTopNValuesDecoder(decoder)
 
        for i := range r.Timestamps {
                fv := r.Fields[0].Values[i]
@@ -256,18 +257,3 @@ func (ei *topNMIterator) Close() error {
        }
        return ei.err
 }
-
-func generateTopNValuesDecoder() *encoding.BytesBlockDecoder {
-       v := topNValuesDecoderPool.Get()
-       if v == nil {
-               return &encoding.BytesBlockDecoder{}
-       }
-       return v
-}
-
-func releaseTopNValuesDecoder(d *encoding.BytesBlockDecoder) {
-       d.Reset()
-       topNValuesDecoderPool.Put(d)
-}
-
-var topNValuesDecoderPool = 
pool.Register[*encoding.BytesBlockDecoder]("topn-valueDecoder")
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index 3d00a0b6..4aa313ea 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -64,6 +64,8 @@ type MeasureQueryOptions struct {
        Entities        [][]*modelv1.TagValue
        TagProjection   []TagProjection
        FieldProjection []string
+       Sort            modelv1.Sort
+       Number          int32
 }
 
 // MeasureResult is the result of a query.
diff --git a/pkg/test/measure/testdata/measures/endpoint_resp_time_minute.json 
b/pkg/test/measure/testdata/measures/endpoint_resp_time_minute.json
new file mode 100644
index 00000000..1de19aae
--- /dev/null
+++ b/pkg/test/measure/testdata/measures/endpoint_resp_time_minute.json
@@ -0,0 +1,37 @@
+{
+  "metadata": {
+    "name": "endpoint_resp_time_minute",
+    "group": "sw_metric"
+  },
+  "tag_families": [
+    {
+      "name": "default",
+      "tags": [
+        {
+          "name": "entity_id",
+          "type": "TAG_TYPE_STRING"
+        }
+      ]
+    }
+  ],
+  "fields": [
+    {
+      "name": "value",
+      "field_type": "FIELD_TYPE_INT",
+      "encoding_method": "ENCODING_METHOD_GORILLA",
+      "compression_method": "COMPRESSION_METHOD_ZSTD"
+    }
+  ],
+  "entity": {
+    "tag_names": [
+      "entity_id"
+    ]
+  },
+  "sharding_key": {
+    "tag_names": [
+      "entity_id"
+    ]
+  },
+  "interval": "1m",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git 
a/pkg/test/measure/testdata/topn_aggregations/endpoint_resp_time_minute_top_bottom_100.json
 
b/pkg/test/measure/testdata/topn_aggregations/endpoint_resp_time_minute_top_bottom_100.json
new file mode 100644
index 00000000..af2ec806
--- /dev/null
+++ 
b/pkg/test/measure/testdata/topn_aggregations/endpoint_resp_time_minute_top_bottom_100.json
@@ -0,0 +1,17 @@
+{
+  "metadata": {
+    "name": "endpoint_resp_time_minute_top_bottom_100",
+    "group": "sw_metric"
+  },
+  "source_measure": {
+    "name": "endpoint_resp_time_minute",
+    "group": "sw_metric"
+  },
+  "field_name": "value",
+  "field_value_sort": 0,
+  "group_by_tag_names": [
+    "entity_id"
+  ],
+  "counters_number": 1000,
+  "lru_size": 10
+}
\ No newline at end of file
diff --git a/test/cases/init.go b/test/cases/init.go
index 3ec50509..71f3f520 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -98,6 +98,8 @@ func Initialize(addr string, now time.Time) {
        casesmeasuredata.Write(conn, "endpoint_traffic", "sw_metric", 
"endpoint_traffic.json", now, interval)
        casesmeasuredata.Write(conn, "duplicated", "exception", 
"duplicated.json", now, 0)
        casesmeasuredata.Write(conn, "service_cpm_minute", "sw_updated", 
"service_cpm_minute_updated_data.json", now.Add(10*time.Minute), interval)
+       casesmeasuredata.Write(conn, "endpoint_resp_time_minute", "sw_metric", 
"endpoint_resp_time_minute_data.json", now, interval)
+       casesmeasuredata.Write(conn, "endpoint_resp_time_minute", "sw_metric", 
"endpoint_resp_time_minute_data1.json", now.Add(10*time.Second), interval)
        casesmeasuredata.WriteMixed(conn, now.Add(30*time.Minute), interval,
                casesmeasuredata.WriteSpec{
                        Metadata: &commonv1.Metadata{Name: 
"service_cpm_minute", Group: "sw_spec"},
diff --git 
a/test/cases/measure/data/testdata/endpoint_resp_time_minute_data.json 
b/test/cases/measure/data/testdata/endpoint_resp_time_minute_data.json
new file mode 100644
index 00000000..59333363
--- /dev/null
+++ b/test/cases/measure/data/testdata/endpoint_resp_time_minute_data.json
@@ -0,0 +1,102 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 200
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_3"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 300
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_4"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 400
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_5"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 500
+        }
+      }
+    ]
+  }
+]
diff --git 
a/test/cases/measure/data/testdata/endpoint_resp_time_minute_data1.json 
b/test/cases/measure/data/testdata/endpoint_resp_time_minute_data1.json
new file mode 100644
index 00000000..efe572fe
--- /dev/null
+++ b/test/cases/measure/data/testdata/endpoint_resp_time_minute_data1.json
@@ -0,0 +1,102 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 1000
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 50
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_3"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 230
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_4"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 80
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "entity_5"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 600
+        }
+      }
+    ]
+  }
+]
diff --git a/test/cases/topn/data/input/aggr_version_merged.ql 
b/test/cases/topn/data/input/aggr_version_merged.ql
new file mode 100644
index 00000000..596cb846
--- /dev/null
+++ b/test/cases/topn/data/input/aggr_version_merged.ql
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+SHOW TOP 3
+FROM MEASURE endpoint_resp_time_minute_top_bottom_100 IN sw_metric
+TIME > '-15m'
+AGGREGATE BY MAX
+ORDER BY DESC
diff --git a/test/cases/topn/data/input/aggr_version_merged.yaml 
b/test/cases/topn/data/input/aggr_version_merged.yaml
new file mode 100644
index 00000000..fb76e724
--- /dev/null
+++ b/test/cases/topn/data/input/aggr_version_merged.yaml
@@ -0,0 +1,22 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "endpoint_resp_time_minute_top_bottom_100"
+groups: ["sw_metric"]
+topN: 3
+fieldValueSort: 1
+agg: 2
diff --git a/test/cases/topn/data/want/aggr_version_merged.yaml 
b/test/cases/topn/data/want/aggr_version_merged.yaml
new file mode 100644
index 00000000..58d3060c
--- /dev/null
+++ b/test/cases/topn/data/want/aggr_version_merged.yaml
@@ -0,0 +1,43 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+lists:
+  - items:
+      - entity:
+          - key: entity_id
+            value:
+              str:
+                value: entity_1
+        value:
+          int:
+            value: "1000"
+      - entity:
+          - key: entity_id
+            value:
+              str:
+                value: entity_5
+        value:
+          int:
+            value: "600"
+      - entity:
+          - key: entity_id
+            value:
+              str:
+                value: entity_3
+        value:
+          int:
+            value: "230"
diff --git a/test/cases/topn/topn.go b/test/cases/topn/topn.go
index bcb1d39b..51b983eb 100644
--- a/test/cases/topn/topn.go
+++ b/test/cases/topn/topn.go
@@ -48,4 +48,5 @@ var _ = g.DescribeTable("TopN Tests", verify,
        g.Entry("using not equal in aggregation", helpers.Args{Input: "ne", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("using in operation in aggregation", helpers.Args{Input: "in", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("using not-in operation in aggregation", helpers.Args{Input: 
"not_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("max top3 with version merged order by desc", 
helpers.Args{Input: "aggr_version_merged", Duration: 25 * time.Minute, Offset: 
-20 * time.Minute}),
 )

Reply via email to