This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch topn
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 8b1186b8f6e14a48d0d7fef85ba0bf36642658b2
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Dec 11 07:42:27 2024 +0800

    Fix top-n high cardinality
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                       |   1 +
 banyand/measure/topn.go                          | 419 ++++++++++++++++-------
 banyand/measure/topn_test.go                     |  91 +++++
 banyand/measure/write.go                         |   2 +-
 banyand/query/processor_topn.go                  |  22 +-
 pkg/partition/index.go                           |   3 +
 pkg/pb/v1/series.go                              |   7 +-
 pkg/pb/v1/value.go                               |  33 ++
 pkg/query/logical/measure/topn_analyzer.go       | 132 +++----
 pkg/query/logical/measure/topn_plan_localscan.go | 275 +++++++++------
 pkg/schema/cache.go                              |   7 +-
 pkg/schema/init.go                               | 178 +++-------
 pkg/schema/schema.go                             |   3 +
 test/cases/topn/data/want/aggr_desc.yaml         |   5 +-
 14 files changed, 746 insertions(+), 432 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index cc1ca646..c3b8cf1e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,7 @@ Release Notes.
 - Index: Support InsertIfAbsent functionality which ensures documents are only 
inserted if their docIDs are not already present in the current index. There is 
a exception for the documents with extra index fields more than the entity's 
index fields.
 - Measure: Introduce "index_mode" to save data exclusively in the series 
index, ideal for non-timeseries measures.
 - Index: Use numeric index type to support Int and Float
+- TopN: Group top n pre-calculation result by the group key in the new 
introduced `_top_n_result` measure, which is used to store the pre-calculation 
result.
 
 ### Bug Fixes
 
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index c64270aa..c7649781 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "bytes"
        "context"
        "encoding/base64"
        "fmt"
@@ -32,7 +33,6 @@ import (
        "golang.org/x/exp/slices"
        "google.golang.org/protobuf/types/known/timestamppb"
 
-       "github.com/apache/skywalking-banyandb/api/common"
        apiData "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -40,13 +40,17 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/flow"
        "github.com/apache/skywalking-banyandb/pkg/flow/streaming"
        "github.com/apache/skywalking-banyandb/pkg/flow/streaming/sources"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
+       "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 const (
@@ -65,18 +69,20 @@ type dataPointWithEntityValues struct {
        *measurev1.DataPointValue
        entityValues []*modelv1.TagValue
        seriesID     uint64
+       shardID      uint32
 }
 
 type topNStreamingProcessor struct {
-       m             *measure
+       pipeline      queue.Queue
        streamingFlow flow.Flow
+       in            chan flow.StreamRecord
        l             *logger.Logger
-       pipeline      queue.Queue
        topNSchema    *databasev1.TopNAggregation
        src           chan interface{}
-       in            chan flow.StreamRecord
+       m             *measure
        errCh         <-chan error
        stopCh        chan struct{}
+       buf           []byte
        flow.ComponentState
        interval      time.Duration
        sortDirection modelv1.Sort
@@ -137,134 +143,93 @@ func (t *topNStreamingProcessor) 
writeStreamRecord(record flow.StreamRecord) err
        var err error
        publisher := t.pipeline.NewBatchPublisher(resultPersistencyTimeout)
        defer publisher.Close()
+       topNValue := GenerateTopNValue()
+       defer ReleaseTopNValue(topNValue)
        for group, tuples := range tuplesGroups {
                if e := t.l.Debug(); e.Enabled() {
-                       e.Str("TopN", t.topNSchema.GetMetadata().GetName()).
-                               Str("group", group).
-                               Int("rankNums", len(tuples)).
-                               Msg("Write tuples")
+                       for i := range tuples {
+                               tuple := tuples[i]
+                               data := 
tuple.V2.(flow.StreamRecord).Data().(flow.Data)
+                               e.
+                                       Int("rankNums", i+1).
+                                       Str("entityValues", fmt.Sprintf("%v", 
data[0])).
+                                       Int("value", int(data[2].(int64))).
+                                       Time("eventTime", eventTime).
+                                       Msgf("Write tuples %s %s", 
t.topNSchema.GetMetadata().GetName(), group)
+                       }
                }
-               for rankNum, tuple := range tuples {
-                       fieldValue := tuple.V1.(int64)
+               topNValue.Reset()
+               topNValue.setMetadata(t.topNSchema.GetFieldName(), 
t.m.schema.Entity.TagNames)
+               var shardID uint32
+               for _, tuple := range tuples {
                        data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
-                       err = multierr.Append(err, t.writeData(publisher, 
eventTime, fieldValue, data, rankNum))
+                       topNValue.addValue(
+                               tuple.V1.(int64),
+                               data[0].([]*modelv1.TagValue),
+                       )
+                       shardID = data[3].(uint32)
                }
-       }
-       return err
-}
-
-func (t *topNStreamingProcessor) writeData(publisher queue.BatchPublisher, 
eventTime time.Time, fieldValue int64,
-       data flow.Data, rankNum int,
-) error {
-       var tagValues []*modelv1.TagValue
-       if len(t.topNSchema.GetGroupByTagNames()) > 0 {
-               var ok bool
-               if tagValues, ok = data[3].([]*modelv1.TagValue); !ok {
-                       return errors.New("fail to extract tag values from topN 
result")
+               entityValues := []*modelv1.TagValue{
+                       {
+                               Value: &modelv1.TagValue_Str{
+                                       Str: &modelv1.Str{
+                                               Value: 
t.topNSchema.GetMetadata().GetName(),
+                                       },
+                               },
+                       },
+                       {
+                               Value: &modelv1.TagValue_Int{
+                                       Int: &modelv1.Int{
+                                               Value: int64(t.sortDirection),
+                                       },
+                               },
+                       },
+                       {
+                               Value: &modelv1.TagValue_Str{
+                                       Str: &modelv1.Str{
+                                               Value: group,
+                                       },
+                               },
+                       },
                }
-       }
-       for _, tv := range tagValues {
-               if tv.Value == nil {
-                       t.l.Warn().Msg("tag value is nil")
+               t.buf = t.buf[:0]
+               if t.buf, err = topNValue.marshal(t.buf); err != nil {
+                       return err
                }
-       }
-       series, shardID, err := t.locate(tagValues, rankNum)
-       if err != nil {
-               return err
-       }
-
-       iwr := &measurev1.InternalWriteRequest{
-               Request: &measurev1.WriteRequest{
-                       MessageId: uint64(time.Now().UnixNano()),
-                       Metadata:  t.topNSchema.GetMetadata(),
-                       DataPoint: &measurev1.DataPointValue{
-                               Timestamp: timestamppb.New(eventTime),
-                               TagFamilies: []*modelv1.TagFamilyForWrite{
-                                       {
-                                               Tags: append(
-                                                       
data[0].([]*modelv1.TagValue),
-                                                       // SortDirection
-                                                       &modelv1.TagValue{
-                                                               Value: 
&modelv1.TagValue_Int{
-                                                                       Int: 
&modelv1.Int{
-                                                                               
Value: int64(t.sortDirection),
-                                                                       },
-                                                               },
-                                                       },
-                                                       // RankNumber
-                                                       &modelv1.TagValue{
-                                                               Value: 
&modelv1.TagValue_Int{
-                                                                       Int: 
&modelv1.Int{
-                                                                               
Value: int64(rankNum),
-                                                                       },
-                                                               },
-                                                       },
-                                               ),
+               iwr := &measurev1.InternalWriteRequest{
+                       Request: &measurev1.WriteRequest{
+                               MessageId: uint64(time.Now().UnixNano()),
+                               Metadata:  &commonv1.Metadata{Name: 
schema.TopNSchemaName, Group: t.topNSchema.GetMetadata().Group},
+                               DataPoint: &measurev1.DataPointValue{
+                                       Timestamp: timestamppb.New(eventTime),
+                                       TagFamilies: 
[]*modelv1.TagFamilyForWrite{
+                                               {Tags: entityValues},
                                        },
-                               },
-                               Fields: []*modelv1.FieldValue{
-                                       {
-                                               Value: &modelv1.FieldValue_Int{
-                                                       Int: &modelv1.Int{
-                                                               Value: 
fieldValue,
+                                       Fields: []*modelv1.FieldValue{
+                                               {
+                                                       Value: 
&modelv1.FieldValue_BinaryData{
+                                                               BinaryData: 
bytes.Clone(t.buf),
                                                        },
                                                },
                                        },
                                },
                        },
-               },
-               EntityValues: series.EntityValues,
-               ShardId:      uint32(shardID),
+                       EntityValues: entityValues,
+                       ShardId:      shardID,
+               }
+               message := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "local", iwr)
+               _, err = publisher.Publish(context.TODO(), 
apiData.TopicMeasureWrite, message)
+               if err != nil {
+                       return err
+               }
        }
-       message := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "local", iwr)
-       _, errWritePub := publisher.Publish(context.TODO(), 
apiData.TopicMeasureWrite, message)
-       return errWritePub
+       return err
 }
 
 func (t *topNStreamingProcessor) downSampleTimeBucket(eventTimeMillis int64) 
time.Time {
        return time.UnixMilli(eventTimeMillis - 
eventTimeMillis%t.interval.Milliseconds())
 }
 
-func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum 
int) (*pbv1.Series, common.ShardID, error) {
-       if len(tagValues) != 0 && len(t.topNSchema.GetGroupByTagNames()) != 
len(tagValues) {
-               return nil, 0, errors.New("no enough tag values for the entity")
-       }
-       series := &pbv1.Series{
-               Subject:      t.topNSchema.GetMetadata().GetName(),
-               EntityValues: make([]*modelv1.TagValue, 1+1+len(tagValues)),
-       }
-
-       copy(series.EntityValues, tagValues)
-       series.EntityValues[len(series.EntityValues)-2] = &modelv1.TagValue{
-               Value: &modelv1.TagValue_Int{
-                       Int: &modelv1.Int{
-                               Value: int64(t.sortDirection),
-                       },
-               },
-       }
-       series.EntityValues[len(series.EntityValues)-1] = &modelv1.TagValue{
-               Value: &modelv1.TagValue_Int{
-                       Int: &modelv1.Int{
-                               Value: int64(rankNum),
-                       },
-               },
-       }
-       if err := series.Marshal(); err != nil {
-               return nil, 0, fmt.Errorf("fail to marshal series: %w", err)
-       }
-       src := make([]byte, len(series.Buffer))
-       copy(src, series.Buffer)
-       var s1 pbv1.Series
-       if err := s1.Unmarshal(src); err != nil {
-               return nil, 0, fmt.Errorf("fail to unmarshal series 
encoded:[%s] tagValues:[%s]: %w", series.Buffer, series.EntityValues, err)
-       }
-       id, err := partition.ShardID(series.Buffer, t.m.shardNum)
-       if err != nil {
-               return nil, 0, err
-       }
-       return series, common.ShardID(id), nil
-}
-
 func (t *topNStreamingProcessor) start() *topNStreamingProcessor {
        flushInterval := t.interval
        if flushInterval > maxFlushInterval {
@@ -326,7 +291,7 @@ func (manager *topNProcessorManager) Close() error {
        return err
 }
 
-func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, request 
*measurev1.InternalWriteRequest) {
+func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, shardID 
uint32, request *measurev1.InternalWriteRequest) {
        go func() {
                manager.RLock()
                defer manager.RUnlock()
@@ -336,6 +301,7 @@ func (manager *topNProcessorManager) 
onMeasureWrite(seriesID uint64, request *me
                                        request.GetRequest().GetDataPoint(),
                                        request.GetEntityValues(),
                                        seriesID,
+                                       shardID,
                                }, 
request.GetRequest().GetDataPoint().GetTimestamp())
                        }
                }
@@ -381,6 +347,7 @@ func (manager *topNProcessorManager) start() error {
                                stopCh:        make(chan struct{}),
                                streamingFlow: streamingFlow,
                                pipeline:      manager.pipeline,
+                               buf:           make([]byte, 0, 64),
                        }
                        processorList[i] = processor.start()
                }
@@ -439,8 +406,9 @@ func (manager *topNProcessorManager) buildMapper(fieldName 
string, groupByNames
                                "",
                                // field value as v2
                                
dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
-                               // groupBy tag values as v3
-                               nil,
+                               // shardID values as v3
+                               dpWithEvs.shardID,
+                               // seriesID values as v4
                                dpWithEvs.seriesID,
                        }
                }, nil
@@ -455,15 +423,14 @@ func (manager *topNProcessorManager) 
buildMapper(fieldName string, groupByNames
                        // EntityValues as identity
                        dpWithEvs.entityValues,
                        // save string representation of group values as the 
key, i.e. v1
-                       strings.Join(transform(groupLocator, func(locator 
partition.TagLocator) string {
-                               return 
stringify(extractTagValue(dpWithEvs.DataPointValue, locator))
-                       }), "|"),
+                       GroupName(transform(groupLocator, func(locator 
partition.TagLocator) string {
+                               return 
Stringify(extractTagValue(dpWithEvs.DataPointValue, locator))
+                       })),
                        // field value as v2
                        dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
-                       // groupBy tag values as v3
-                       transform(groupLocator, func(locator 
partition.TagLocator) *modelv1.TagValue {
-                               return 
extractTagValue(dpWithEvs.DataPointValue, locator)
-                       }),
+                       // shardID values as v3
+                       dpWithEvs.shardID,
+                       // seriesID values as v4
                        dpWithEvs.seriesID,
                }
        }, nil
@@ -499,7 +466,8 @@ func extractTagValue(dpv *measurev1.DataPointValue, locator 
partition.TagLocator
        return tagFamily.GetTags()[locator.TagOffset]
 }
 
-func stringify(tagValue *modelv1.TagValue) string {
+// Stringify converts a TagValue to a string.
+func Stringify(tagValue *modelv1.TagValue) string {
        switch v := tagValue.GetValue().(type) {
        case *modelv1.TagValue_Str:
                return v.Str.GetValue()
@@ -525,3 +493,208 @@ func transform[I, O any](input []I, mapper func(I) O) []O 
{
        }
        return output
 }
+
+// GenerateTopNValue returns a new TopNValue instance.
+func GenerateTopNValue() *TopNValue {
+       v := topNValuePool.Get()
+       if v == nil {
+               return &TopNValue{}
+       }
+       return v
+}
+
+// ReleaseTopNValue releases a TopNValue instance.
+func ReleaseTopNValue(bi *TopNValue) {
+       bi.Reset()
+       topNValuePool.Put(bi)
+}
+
+var topNValuePool = pool.Register[*TopNValue]("measure-topNValue")
+
+// TopNValue represents the topN value.
+type TopNValue struct {
+       valueName      string
+       entityTagNames []string
+       values         []int64
+       entities       [][]*modelv1.TagValue
+       entityValues   [][]byte
+       buf            []byte
+       encodeType     encoding.EncodeType
+       firstValue     int64
+}
+
+func (t *TopNValue) setMetadata(valueName string, entityTagNames []string) {
+       t.valueName = valueName
+       t.entityTagNames = entityTagNames
+}
+
+func (t *TopNValue) addValue(value int64, entityValues []*modelv1.TagValue) {
+       t.values = append(t.values, value)
+       t.entities = append(t.entities, entityValues)
+}
+
+// Values returns the valueName, entityTagNames, values, and entities.
+func (t *TopNValue) Values() (string, []string, []int64, 
[][]*modelv1.TagValue) {
+       return t.valueName, t.entityTagNames, t.values, t.entities
+}
+
+// Reset resets the TopNValue.
+func (t *TopNValue) Reset() {
+       t.valueName = ""
+       t.entityTagNames = t.entityTagNames[:0]
+       t.values = t.values[:0]
+       for i := range t.entities {
+               t.entities[i] = t.entities[i][:0]
+       }
+       t.entities = t.entities[:0]
+       t.buf = t.buf[:0]
+       t.encodeType = encoding.EncodeTypeUnknown
+       t.firstValue = 0
+       for i := range t.entityValues {
+               t.entityValues[i] = t.entityValues[i][:0]
+       }
+       t.entityValues = t.entityValues[:0]
+}
+
+func (t *TopNValue) resizeEntityValues(size int) [][]byte {
+       entityValues := t.entityValues
+       if n := size - cap(entityValues); n > 0 {
+               entityValues = append(entityValues[:cap(entityValues)], 
make([][]byte, n)...)
+       }
+       t.entityValues = entityValues[:size]
+       return t.entityValues
+}
+
+func (t *TopNValue) resizeEntities(size int, entitySize int) 
[][]*modelv1.TagValue {
+       entities := t.entities
+       if n := size - cap(t.entities); n > 0 {
+               entities = append(entities[:cap(entities)], 
make([][]*modelv1.TagValue, n)...)
+       }
+       t.entities = entities[:size]
+       for i := range t.entities {
+               entity := t.entities[i]
+               if n := entitySize - cap(entity); n > 0 {
+                       entity = append(entity[:cap(entity)], 
make([]*modelv1.TagValue, n)...)
+               }
+               t.entities[i] = entity[:0]
+       }
+       return t.entities
+}
+
+func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
+       if len(t.values) == 0 {
+               return nil, errors.New("values is empty")
+       }
+       dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
+       for _, entityTagName := range t.entityTagNames {
+               dst = encoding.EncodeBytes(dst, 
convert.StringToBytes(entityTagName))
+       }
+
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.values)))
+
+       t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf, 
t.values)
+       dst = append(dst, byte(t.encodeType))
+       dst = encoding.VarInt64ToBytes(dst, t.firstValue)
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
+       dst = append(dst, t.buf...)
+
+       var err error
+       t.entityValues = t.resizeEntityValues(len(t.entities))
+       for i, tvv := range t.entities {
+               t.entityValues[i], err = 
pbv1.MarshalTagValues(t.entityValues[i], tvv)
+               if err != nil {
+                       return nil, err
+               }
+       }
+       dst = encoding.EncodeBytesBlock(dst, t.entityValues)
+       return dst, nil
+}
+
+// Unmarshal unmarshals the TopNValue from the src.
+func (t *TopNValue) Unmarshal(src []byte, decoder *encoding.BytesBlockDecoder) 
error {
+       var err error
+       src, nameBytes, err := encoding.DecodeBytes(src)
+       if err != nil {
+               return fmt.Errorf("cannot unmarshal topNValue.name: %w", err)
+       }
+       t.valueName = convert.BytesToString(nameBytes)
+
+       var entityTagNamesCount uint64
+       src, entityTagNamesCount, err = encoding.BytesToVarUint64(src)
+       if err != nil {
+               return fmt.Errorf("cannot unmarshal 
topNValue.entityTagNamesCount: %w", err)
+       }
+       t.entityTagNames = make([]string, 0, entityTagNamesCount)
+       var entityTagNameBytes []byte
+       for i := uint64(0); i < entityTagNamesCount; i++ {
+               src, entityTagNameBytes, err = encoding.DecodeBytes(src)
+               if err != nil {
+                       return fmt.Errorf("cannot unmarshal 
topNValue.entityTagName: %w", err)
+               }
+               t.entityTagNames = append(t.entityTagNames, 
convert.BytesToString(entityTagNameBytes))
+       }
+
+       var valuesCount uint64
+       src, valuesCount, err = encoding.BytesToVarUint64(src)
+       if err != nil {
+               return fmt.Errorf("cannot unmarshal topNValue.valuesCount: %w", 
err)
+       }
+
+       if len(src) < 1 {
+               return fmt.Errorf("cannot unmarshal topNValue.encodeType: src 
is too short")
+       }
+       t.encodeType = encoding.EncodeType(src[0])
+       src = src[1:]
+
+       if len(src) < 1 {
+               return fmt.Errorf("cannot unmarshal topNValue.firstValue: src 
is too short")
+       }
+       src, t.firstValue, err = encoding.BytesToVarInt64(src)
+       if err != nil {
+               return fmt.Errorf("cannot unmarshal topNValue.firstValue: %w", 
err)
+       }
+       if len(src) < 1 {
+               return fmt.Errorf("cannot unmarshal topNValue.valueLen: src is 
too short")
+       }
+       var valueLen uint64
+       src, valueLen, err = encoding.BytesToVarUint64(src)
+       if err != nil {
+               return fmt.Errorf("cannot unmarshal topNValue.valueLen: %w", 
err)
+       }
+
+       if uint64(len(src)) < valueLen {
+               return fmt.Errorf("src is too short for reading string with 
size %d; len(src)=%d", valueLen, len(src))
+       }
+
+       t.values, err = encoding.BytesToInt64List(t.values, src[:valueLen], 
t.encodeType, t.firstValue, int(valuesCount))
+       if err != nil {
+               return fmt.Errorf("cannot unmarshal topNValue.values: %w", err)
+       }
+
+       if uint64(len(src)) < valueLen {
+               return fmt.Errorf("src is too short for reading string with 
size %d; len(src)=%d", valueLen, len(src))
+       }
+
+       decoder.Reset()
+       t.entityValues, err = decoder.Decode(t.entityValues, src[valueLen:], 
valuesCount)
+       if err != nil {
+               return fmt.Errorf("cannot unmarshal topNValue.entityValues: 
%w", err)
+       }
+       t.resizeEntities(len(t.entityValues), int(entityTagNamesCount))
+       for i, ev := range t.entityValues {
+               t.buf, t.entities[i], err = pbv1.UnmarshalTagValues(t.buf, 
t.entities[i], ev)
+               if err != nil {
+                       return fmt.Errorf("cannot unmarshal 
topNValue.entityValues[%d]: %w", i, err)
+               }
+               if len(t.entities[i]) != len(t.entityTagNames) {
+                       return fmt.Errorf("entityValues[%d] length is not equal 
to entityTagNames", i)
+               }
+       }
+       return nil
+}
+
+// GroupName returns the group name.
+func GroupName(groupTags []string) string {
+       return strings.Join(groupTags, "|")
+}
diff --git a/banyand/measure/topn_test.go b/banyand/measure/topn_test.go
new file mode 100644
index 00000000..b416ec6b
--- /dev/null
+++ b/banyand/measure/topn_test.go
@@ -0,0 +1,91 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/require"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+)
+
+func TestTopNValue_MarshalUnmarshal(t *testing.T) {
+       tests := []struct {
+               topNVal *TopNValue
+               name    string
+       }{
+               {
+                       name: "simple case",
+                       topNVal: &TopNValue{
+                               valueName:      "testValue",
+                               entityTagNames: []string{"tag1", "tag2"},
+                               values:         []int64{1, 2, 3},
+                               entities: [][]*modelv1.TagValue{
+                                       {
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc1"}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "entity1"}}},
+                                       },
+                                       {
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc1"}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "entity2"}}},
+                                       },
+                                       {
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc1"}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "entity3"}}},
+                                       },
+                               },
+                       },
+               },
+               {
+                       name: "single",
+                       topNVal: &TopNValue{
+                               valueName:      "testValue",
+                               entityTagNames: []string{"tag1", "tag2"},
+                               values:         []int64{1},
+                               entities: [][]*modelv1.TagValue{
+                                       {
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc1"}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "entity1"}}},
+                                       },
+                               },
+                       },
+               },
+       }
+       decoder := generateColumnValuesDecoder()
+       defer releaseColumnValuesDecoder(decoder)
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       // Marshal the topNValue
+                       dst, err := tt.topNVal.marshal(nil)
+                       require.NoError(t, err)
+
+                       // Unmarshal the topNValue
+                       var decodedTopNValue TopNValue
+                       err = decodedTopNValue.Unmarshal(dst, decoder)
+                       require.NoError(t, err)
+
+                       // Compare the original and decoded topNValue
+                       require.Equal(t, tt.topNVal.valueName, 
decodedTopNValue.valueName)
+                       require.Equal(t, tt.topNVal.entityTagNames, 
decodedTopNValue.entityTagNames)
+                       require.Equal(t, tt.topNVal.values, 
decodedTopNValue.values)
+                       require.Equal(t, tt.topNVal.entities, 
decodedTopNValue.entities)
+               })
+       }
+}
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index b1e9810b..859cf09a 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -145,7 +145,7 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
        dpt.dataPoints.fields = append(dpt.dataPoints.fields, field)
 
        if stm.processorManager != nil {
-               stm.processorManager.onMeasureWrite(uint64(series.ID), 
&measurev1.InternalWriteRequest{
+               stm.processorManager.onMeasureWrite(uint64(series.ID), 
uint32(shardID), &measurev1.InternalWriteRequest{
                        Request: &measurev1.WriteRequest{
                                Metadata:  stm.GetSchema().Metadata,
                                DataPoint: req.DataPoint,
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index e042df76..234230f2 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -40,6 +40,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/aggregation"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        logical_measure 
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
+       pkgschema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 type topNQueryProcessor struct {
@@ -94,21 +95,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                return
        }
 
-       topNResultMeasure, err := t.measureService.Measure(topNMetadata)
-       if err != nil {
-               t.log.Error().Err(err).
-                       Str("topN", topNMetadata.GetName()).
-                       Msg("fail to find topN result measure")
-               return
-       }
-
-       s, err := logical_measure.BuildTopNSchema(topNResultMeasure.GetSchema())
-       if err != nil {
-               t.log.Error().Err(err).
-                       Str("topN", topNMetadata.GetName()).
-                       Msg("fail to build schema")
-       }
-       plan, err := logical_measure.TopNAnalyze(ctx, request, 
topNResultMeasure.GetSchema(), sourceMeasure.GetSchema(), s)
+       plan, err := logical_measure.TopNAnalyze(request, 
sourceMeasure.GetSchema(), topNSchema)
        if err != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for topn %s: %v", topNMetadata.GetName(), err))
                return
@@ -117,6 +104,11 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
        if e := ml.Debug(); e.Enabled() {
                e.Str("plan", plan.String()).Msg("topn plan")
        }
+       topNResultMeasure, err := 
t.measureService.Measure(pkgschema.GetTopNSchemaMetadata(topNMetadata.Group))
+       if err != nil {
+               ml.Error().Err(err).Str("topN", 
topNMetadata.GetName()).Msg("fail to find topn result measure")
+               return
+       }
        var tracer *query.Tracer
        var span *query.Span
        if request.Trace {
diff --git a/pkg/partition/index.go b/pkg/partition/index.go
index bbb4daf0..f3c69833 100644
--- a/pkg/partition/index.go
+++ b/pkg/partition/index.go
@@ -49,6 +49,9 @@ func ParseIndexRuleLocators(entity *databasev1.Entity, 
families []*databasev1.Ta
        }
        findIndexRuleByTagName := func(tagName string) *databasev1.IndexRule {
                for i := range indexRules {
+                       if indexRules[i] == nil {
+                               continue
+                       }
                        for j := range indexRules[i].Tags {
                                if indexRules[i].Tags[j] == tagName {
                                        return indexRules[i]
diff --git a/pkg/pb/v1/series.go b/pkg/pb/v1/series.go
index 542343d9..31a6f289 100644
--- a/pkg/pb/v1/series.go
+++ b/pkg/pb/v1/series.go
@@ -48,10 +48,9 @@ func (s *Series) CopyTo(dst *Series) {
 func (s *Series) Marshal() error {
        s.Buffer = marshalEntityValue(s.Buffer, 
convert.StringToBytes(s.Subject))
        var err error
-       for _, tv := range s.EntityValues {
-               if s.Buffer, err = marshalTagValue(s.Buffer, tv); err != nil {
-                       return errors.WithMessage(err, "marshal subject and 
entity values")
-               }
+       s.Buffer, err = MarshalTagValues(s.Buffer, s.EntityValues)
+       if err != nil {
+               return errors.WithMessage(err, "marshal subject and entity 
values")
        }
        s.ID = common.SeriesID(convert.Hash(s.Buffer))
        return nil
diff --git a/pkg/pb/v1/value.go b/pkg/pb/v1/value.go
index 1e73e6c7..b9c6fa80 100644
--- a/pkg/pb/v1/value.go
+++ b/pkg/pb/v1/value.go
@@ -97,6 +97,33 @@ func MustTagValueToStr(tag *modelv1.TagValue) string {
        }
 }
 
+// MarshalTagValues marshals tag values.
+func MarshalTagValues(dest []byte, tags []*modelv1.TagValue) ([]byte, error) {
+       var err error
+       for _, tag := range tags {
+               dest, err = marshalTagValue(dest, tag)
+               if err != nil {
+                       return nil, err
+               }
+       }
+       return dest, nil
+}
+
+// UnmarshalTagValues unmarshals tag values.
+func UnmarshalTagValues(dest []byte, destTags []*modelv1.TagValue, src []byte) 
([]byte, []*modelv1.TagValue, error) {
+       var err error
+       var tag *modelv1.TagValue
+       for len(src) > 0 {
+               dest = dest[:0]
+               dest, src, tag, err = unmarshalTagValue(dest, src)
+               if err != nil {
+                       return nil, nil, err
+               }
+               destTags = append(destTags, tag)
+       }
+       return dest, destTags, nil
+}
+
 func marshalTagValue(dest []byte, tv *modelv1.TagValue) ([]byte, error) {
        dest = append(dest, byte(MustTagValueToValueType(tv)))
        switch tv.Value.(type) {
@@ -136,6 +163,9 @@ func unmarshalTagValue(dest []byte, src []byte) ([]byte, 
[]byte, *modelv1.TagVal
                if dest, src, err = unmarshalEntityValue(dest, src[1:]); err != 
nil {
                        return nil, nil, nil, errors.WithMessage(err, 
"unmarshal string tag value")
                }
+               if len(dest) == 0 {
+                       return dest, src, NullTagValue, nil
+               }
                return dest, src, &modelv1.TagValue{
                        Value: &modelv1.TagValue_Str{
                                Str: &modelv1.Str{
@@ -158,6 +188,9 @@ func unmarshalTagValue(dest []byte, src []byte) ([]byte, 
[]byte, *modelv1.TagVal
                if dest, src, err = unmarshalEntityValue(dest, src[1:]); err != 
nil {
                        return nil, nil, nil, errors.WithMessage(err, 
"unmarshal binary tag value")
                }
+               if len(dest) == 0 {
+                       return dest, src, NullTagValue, nil
+               }
                data := make([]byte, len(dest))
                copy(data, dest)
                return dest, src, &modelv1.TagValue{
diff --git a/pkg/query/logical/measure/topn_analyzer.go 
b/pkg/query/logical/measure/topn_analyzer.go
index 4d72787f..eed24da5 100644
--- a/pkg/query/logical/measure/topn_analyzer.go
+++ b/pkg/query/logical/measure/topn_analyzer.go
@@ -19,69 +19,49 @@
 package measure
 
 import (
-       "context"
-       "fmt"
+       "errors"
 
-       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
+       pkgschema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
-// BuildTopNSchema returns Schema loaded from the metadata repository.
-func BuildTopNSchema(md *databasev1.Measure) (logical.Schema, error) {
-       md.GetEntity()
-
-       ms := &schema{
-               common: &logical.CommonSchema{
-                       TagSpecMap: make(map[string]*logical.TagSpec),
-                       EntityList: md.GetEntity().GetTagNames(),
-               },
-               measure:  md,
-               fieldMap: make(map[string]*logical.FieldSpec),
-       }
-
-       ms.common.RegisterTagFamilies(md.GetTagFamilies())
-
-       for fieldIdx, spec := range md.GetFields() {
-               ms.registerField(fieldIdx, spec)
-       }
-
-       return ms, nil
-}
-
 // TopNAnalyze converts logical expressions to executable operation tree 
represented by Plan.
-func TopNAnalyze(_ context.Context, criteria *measurev1.TopNRequest, schema 
*databasev1.Measure,
-       sourceMeasureSchema *databasev1.Measure, s logical.Schema,
+func TopNAnalyze(criteria *measurev1.TopNRequest, sourceMeasureSchema 
*databasev1.Measure, topNAggSchema *databasev1.TopNAggregation,
 ) (logical.Plan, error) {
-       groupByProjectionTags := sourceMeasureSchema.GetEntity().GetTagNames()
-       groupByTags := make([][]*logical.Tag, len(schema.GetTagFamilies()))
-       tagFamily := schema.GetTagFamilies()[0]
-       groupByTags[0] = logical.NewTags(tagFamily.GetName(), 
groupByProjectionTags...)
-
-       if len(schema.GetFields()) != 1 {
-               return nil, fmt.Errorf("topN schema fields count should be 1 
but got %d", len(schema.GetFields()))
-       }
-       projectionFields := make([]*logical.Field, 1)
-       fieldName := schema.GetFields()[0].GetName()
-       projectionFields[0] = logical.NewField(fieldName)
        // parse fields
-       plan := parse(criteria, schema.GetMetadata(), projectionFields, 
groupByTags)
+       timeRange := criteria.GetTimeRange()
+       var plan logical.UnresolvedPlan
+       plan = &unresolvedLocalScan{
+               name:        criteria.Name,
+               startTime:   timeRange.GetBegin().AsTime(),
+               endTime:     timeRange.GetEnd().AsTime(),
+               conditions:  criteria.Conditions,
+               sort:        criteria.FieldValueSort,
+               groupByTags: topNAggSchema.GroupByTagNames,
+       }
+       s, err := buildVirtualSchema(sourceMeasureSchema, 
topNAggSchema.FieldName)
+       if err != nil {
+               return nil, err
+       }
 
        if criteria.GetAgg() != 0 {
+               groupByProjectionTags := 
sourceMeasureSchema.GetEntity().GetTagNames()
+               groupByTags := 
[][]*logical.Tag{logical.NewTags(pkgschema.TopNTagFamily, 
groupByProjectionTags...)}
                plan = newUnresolvedGroupBy(plan, groupByTags, false)
                plan = newUnresolvedAggregation(plan,
-                       projectionFields[0],
+                       &logical.Field{Name: topNAggSchema.FieldName},
                        criteria.GetAgg(),
                        true)
        }
 
        plan = top(plan, &measurev1.QueryRequest_Top{
                Number:         criteria.GetTopN(),
-               FieldName:      fieldName,
+               FieldName:      topNAggSchema.FieldName,
                FieldValueSort: criteria.GetFieldValueSort(),
        })
+
        p, err := plan.Analyze(s)
        if err != nil {
                return nil, err
@@ -89,26 +69,54 @@ func TopNAnalyze(_ context.Context, criteria 
*measurev1.TopNRequest, schema *dat
        return p, nil
 }
 
-func parse(criteria *measurev1.TopNRequest, metadata *commonv1.Metadata,
-       projFields []*logical.Field, projTags [][]*logical.Tag,
-) logical.UnresolvedPlan {
-       timeRange := criteria.GetTimeRange()
-       return local(timeRange.GetBegin().AsTime(), timeRange.GetEnd().AsTime(),
-               metadata, projTags, projFields, buildConditions(criteria), 
criteria.GetFieldValueSort())
-}
-
-func buildConditions(criteria *measurev1.TopNRequest) []*modelv1.Condition {
-       return append([]*modelv1.Condition{
-               {
-                       Name: "sortDirection",
-                       Op:   modelv1.Condition_BINARY_OP_EQ,
-                       Value: &modelv1.TagValue{
-                               Value: &modelv1.TagValue_Int{
-                                       Int: &modelv1.Int{
-                                               Value: 
int64(criteria.GetFieldValueSort().Number()),
-                                       },
-                               },
+func buildVirtualSchema(sourceMeasureSchema *databasev1.Measure, fieldName 
string) (logical.Schema, error) {
+       var tags []*databasev1.TagSpec
+       for _, tag := range sourceMeasureSchema.GetEntity().TagNames {
+               for i := range sourceMeasureSchema.GetTagFamilies() {
+                       for j := range 
sourceMeasureSchema.GetTagFamilies()[i].Tags {
+                               if 
sourceMeasureSchema.GetTagFamilies()[i].Tags[j].Name == tag {
+                                       tags = append(tags, 
sourceMeasureSchema.GetTagFamilies()[i].Tags[j])
+                                       if len(tags) == 
len(sourceMeasureSchema.GetEntity().TagNames) {
+                                               break
+                                       }
+                               }
+                       }
+               }
+       }
+       if len(tags) != len(sourceMeasureSchema.GetEntity().TagNames) {
+               return nil, errors.New("failed to build topn schema, source 
measure schema is invalid:" + sourceMeasureSchema.String())
+       }
+       var fields []*databasev1.FieldSpec
+       for _, field := range sourceMeasureSchema.GetFields() {
+               if field.GetName() == fieldName {
+                       fields = append(fields, field)
+                       break
+               }
+       }
+       md := &databasev1.Measure{
+               Metadata: sourceMeasureSchema.Metadata,
+               TagFamilies: []*databasev1.TagFamilySpec{
+                       {
+                               Name: pkgschema.TopNTagFamily,
+                               Tags: tags,
                        },
                },
-       }, criteria.GetConditions()...)
+               Fields: fields,
+               Entity: &databasev1.Entity{
+                       TagNames: sourceMeasureSchema.Entity.TagNames,
+               },
+       }
+       ms := &schema{
+               common: &logical.CommonSchema{
+                       TagSpecMap: make(map[string]*logical.TagSpec),
+                       EntityList: md.GetEntity().GetTagNames(),
+               },
+               measure:  md,
+               fieldMap: make(map[string]*logical.FieldSpec),
+       }
+       ms.common.RegisterTagFamilies(md.GetTagFamilies())
+       for fieldIdx, spec := range md.GetFields() {
+               ms.registerField(fieldIdx, spec)
+       }
+       return ms, nil
 }
diff --git a/pkg/query/logical/measure/topn_plan_localscan.go 
b/pkg/query/logical/measure/topn_plan_localscan.go
index 9608af95..ee319af6 100644
--- a/pkg/query/logical/measure/topn_plan_localscan.go
+++ b/pkg/query/logical/measure/topn_plan_localscan.go
@@ -24,95 +24,105 @@ import (
        "time"
 
        "github.com/pkg/errors"
+       "google.golang.org/protobuf/types/known/timestamppb"
 
-       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       "github.com/apache/skywalking-banyandb/pkg/index"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/banyand/measure"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
+       pkgschema "github.com/apache/skywalking-banyandb/pkg/schema"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-var _ logical.UnresolvedPlan = (*unresolvedLocalScan)(nil)
+var (
+       _               logical.UnresolvedPlan = (*unresolvedLocalScan)(nil)
+       fieldProjection                        = 
[]string{pkgschema.TopNFieldName}
+)
 
 type unresolvedLocalScan struct {
-       startTime        time.Time
-       endTime          time.Time
-       metadata         *commonv1.Metadata
-       conditions       []*modelv1.Condition
-       projectionTags   [][]*logical.Tag
-       projectionFields []*logical.Field
-       sort             modelv1.Sort
+       startTime   time.Time
+       endTime     time.Time
+       name        string
+       conditions  []*modelv1.Condition
+       groupByTags []string
+       sort        modelv1.Sort
 }
 
 func (uls *unresolvedLocalScan) Analyze(s logical.Schema) (logical.Plan, 
error) {
-       var projTagsRefs [][]*logical.TagRef
-       projTags := make([]model.TagProjection, len(uls.projectionTags))
-       if len(uls.projectionTags) > 0 {
-               for i := range uls.projectionTags {
-                       for _, tag := range uls.projectionTags[i] {
-                               projTags[i].Family = tag.GetFamilyName()
-                               projTags[i].Names = append(projTags[i].Names, 
tag.GetTagName())
-                       }
-               }
-               var err error
-               projTagsRefs, err = s.CreateTagRef(uls.projectionTags...)
-               if err != nil {
-                       return nil, err
-               }
+       tr := timestamp.NewInclusiveTimeRange(uls.startTime, uls.endTime)
+       groupByTags, err := uls.parseGroupByTags()
+       if err != nil {
+               return nil, errors.Wrap(err, "failed to locate entity")
        }
-
-       var projFieldRefs []*logical.FieldRef
-       var projField []string
-       if len(uls.projectionFields) > 0 {
-               for i := range uls.projectionFields {
-                       projField = append(projField, 
uls.projectionFields[i].Name)
-               }
-               var err error
-               projFieldRefs, err = s.CreateFieldRef(uls.projectionFields...)
-               if err != nil {
-                       return nil, err
-               }
+       entities := [][]*modelv1.TagValue{
+               {
+                       {
+                               Value: &modelv1.TagValue_Str{
+                                       Str: &modelv1.Str{
+                                               Value: uls.name,
+                                       },
+                               },
+                       },
+                       {
+                               Value: &modelv1.TagValue_Int{
+                                       Int: &modelv1.Int{
+                                               Value: int64(uls.sort),
+                                       },
+                               },
+                       },
+                       pbv1.AnyTagValue,
+               },
        }
-
-       entity, err := uls.locateEntity(s.EntityList())
-       if err != nil {
-               return nil, err
+       if len(groupByTags) > 0 {
+               entities[0][len(entities[0])-1] = &modelv1.TagValue{
+                       Value: &modelv1.TagValue_Str{
+                               Str: &modelv1.Str{
+                                       Value: measure.GroupName(groupByTags),
+                               },
+                       },
+               }
        }
-
        return &localScan{
-               timeRange:            
timestamp.NewInclusiveTimeRange(uls.startTime, uls.endTime),
-               schema:               s,
-               projectionTagsRefs:   projTagsRefs,
-               projectionFieldsRefs: projFieldRefs,
-               projectionTags:       projTags,
-               projectionFields:     projField,
-               metadata:             uls.metadata,
-               entity:               entity,
-               l:                    logger.GetLogger("topn", "measure", 
uls.metadata.Group, uls.metadata.Name, "local-index"),
+               s: s,
+               options: model.MeasureQueryOptions{
+                       Name:      pkgschema.TopNSchemaName,
+                       TimeRange: &tr,
+                       Entities:  entities,
+                       TagProjection: []model.TagProjection{
+                               {
+                                       Family: pkgschema.TopNTagFamily,
+                                       Names:  pkgschema.TopNTagNames,
+                               },
+                       },
+                       FieldProjection: fieldProjection,
+               },
        }, nil
 }
 
-func (uls *unresolvedLocalScan) locateEntity(entityList []string) 
([]*modelv1.TagValue, error) {
+func (uls *unresolvedLocalScan) parseGroupByTags() ([]string, error) {
+       if len(uls.conditions) == 0 {
+               return nil, nil
+       }
        entityMap := make(map[string]int)
-       entity := make([]*modelv1.TagValue, len(entityList))
-       for idx, tagName := range entityList {
+       entity := make([]string, len(uls.groupByTags))
+       for idx, tagName := range uls.groupByTags {
                entityMap[tagName] = idx
-               // allow to make fuzzy search with partial conditions
-               entity[idx] = pbv1.AnyTagValue
        }
+       parsed := 0
        for _, pairQuery := range uls.conditions {
                if pairQuery.GetOp() != modelv1.Condition_BINARY_OP_EQ {
                        return nil, errors.Errorf("tag belongs to the entity 
only supports EQ operation in condition(%v)", pairQuery)
                }
                if entityIdx, ok := entityMap[pairQuery.GetName()]; ok {
-                       entity[entityIdx] = pairQuery.Value
                        switch pairQuery.GetValue().GetValue().(type) {
                        case *modelv1.TagValue_Str, *modelv1.TagValue_Int, 
*modelv1.TagValue_Null:
-                               entity[entityIdx] = pairQuery.Value
+                               entity[entityIdx] = 
measure.Stringify(pairQuery.Value)
+                               parsed++
                        default:
                                return nil, errors.New("unsupported condition 
tag type for entity")
                        }
@@ -120,61 +130,37 @@ func (uls *unresolvedLocalScan) locateEntity(entityList 
[]string) ([]*modelv1.Ta
                }
                return nil, errors.New("only groupBy tag name is supported")
        }
+       if parsed != len(uls.groupByTags) {
+               return nil, errors.New("failed to parse all groupBy tags")
+       }
 
        return entity, nil
 }
 
-func local(startTime, endTime time.Time, metadata *commonv1.Metadata, 
projectionTags [][]*logical.Tag,
-       projectionFields []*logical.Field, conditions []*modelv1.Condition, 
sort modelv1.Sort,
-) logical.UnresolvedPlan {
-       return &unresolvedLocalScan{
-               startTime:        startTime,
-               endTime:          endTime,
-               metadata:         metadata,
-               projectionTags:   projectionTags,
-               projectionFields: projectionFields,
-               conditions:       conditions,
-               sort:             sort,
-       }
-}
-
 var _ logical.Plan = (*localScan)(nil)
 
 type localScan struct {
-       schema               logical.Schema
-       metadata             *commonv1.Metadata
-       l                    *logger.Logger
-       timeRange            timestamp.TimeRange
-       projectionTagsRefs   [][]*logical.TagRef
-       projectionFieldsRefs []*logical.FieldRef
-       projectionTags       []model.TagProjection
-       projectionFields     []string
-       entity               []*modelv1.TagValue
-       sort                 modelv1.Sort
+       s       logical.Schema
+       options model.MeasureQueryOptions
 }
 
 func (i *localScan) Execute(ctx context.Context) (mit executor.MIterator, err 
error) {
        ec := executor.FromMeasureExecutionContext(ctx)
-       result, err := ec.Query(ctx, model.MeasureQueryOptions{
-               Name:            i.metadata.GetName(),
-               TimeRange:       &i.timeRange,
-               Entities:        [][]*modelv1.TagValue{i.entity},
-               Order:           &index.OrderBy{Sort: i.sort},
-               TagProjection:   i.projectionTags,
-               FieldProjection: i.projectionFields,
-       })
+       result, err := ec.Query(ctx, i.options)
        if err != nil {
                return nil, fmt.Errorf("failed to query measure: %w", err)
        }
-       return &resultMIterator{
-               result: result,
+       return &topNMIterator{
+               result:    result,
+               topNValue: measure.GenerateTopNValue(),
+               decoder:   generateTopNValuesDecoder(),
        }, nil
 }
 
 func (i *localScan) String() string {
-       return fmt.Sprintf("IndexScan: 
startTime=%d,endTime=%d,Metadata{group=%s,name=%s}; projection=%s; sort=%s;",
-               i.timeRange.Start.Unix(), i.timeRange.End.Unix(), 
i.metadata.GetGroup(), i.metadata.GetName(),
-               logical.FormatTagRefs(", ", i.projectionTagsRefs...), i.sort)
+       return fmt.Sprintf("TopNAggScan: %s startTime=%d,endTime=%d, 
entity=%s;",
+               i.options.Name, i.options.TimeRange.Start.Unix(), 
i.options.TimeRange.End.Unix(),
+               i.options.Entities)
 }
 
 func (i *localScan) Children() []logical.Plan {
@@ -182,8 +168,101 @@ func (i *localScan) Children() []logical.Plan {
 }
 
 func (i *localScan) Schema() logical.Schema {
-       if len(i.projectionTagsRefs) == 0 {
-               return i.schema
+       return i.s
+}
+
+type topNMIterator struct {
+       result    model.MeasureQueryResult
+       err       error
+       topNValue *measure.TopNValue
+       decoder   *encoding.BytesBlockDecoder
+       current   []*measurev1.DataPoint
+}
+
+func (ei *topNMIterator) Next() bool {
+       if ei.result == nil {
+               return false
        }
-       return 
i.schema.ProjTags(i.projectionTagsRefs...).ProjFields(i.projectionFieldsRefs...)
+
+       r := ei.result.Pull()
+       if r == nil {
+               return false
+       }
+       if r.Error != nil {
+               ei.err = r.Error
+               return false
+       }
+       ei.current = ei.current[:0]
+
+       for i := range r.Timestamps {
+               fv := r.Fields[0].Values[i]
+               bd := fv.GetBinaryData()
+               if bd == nil {
+                       ei.err = errors.New("failed to get binary data")
+                       return false
+               }
+               ei.topNValue.Reset()
+               ei.err = ei.topNValue.Unmarshal(bd, ei.decoder)
+               if ei.err != nil {
+                       return false
+               }
+               fieldName, entityNames, values, entities := 
ei.topNValue.Values()
+               for j := range entities {
+                       dp := &measurev1.DataPoint{
+                               Timestamp: timestamppb.New(time.Unix(0, 
r.Timestamps[i])),
+                               Sid:       uint64(r.SID),
+                               Version:   r.Versions[i],
+                       }
+                       tagFamily := &modelv1.TagFamily{
+                               Name: pkgschema.TopNTagFamily,
+                       }
+                       dp.TagFamilies = append(dp.TagFamilies, tagFamily)
+                       for k, entityName := range entityNames {
+                               tagFamily.Tags = append(tagFamily.Tags, 
&modelv1.Tag{
+                                       Key:   entityName,
+                                       Value: entities[j][k],
+                               })
+                       }
+                       dp.Fields = append(dp.Fields, 
&measurev1.DataPoint_Field{
+                               Name: fieldName,
+                               Value: &modelv1.FieldValue{
+                                       Value: &modelv1.FieldValue_Int{
+                                               Int: &modelv1.Int{
+                                                       Value: values[j],
+                                               },
+                                       },
+                               },
+                       })
+                       ei.current = append(ei.current, dp)
+               }
+       }
+       return true
 }
+
+func (ei *topNMIterator) Current() []*measurev1.DataPoint {
+       return ei.current
+}
+
+func (ei *topNMIterator) Close() error {
+       if ei.result != nil {
+               ei.result.Release()
+       }
+       releaseTopNValuesDecoder(ei.decoder)
+       measure.ReleaseTopNValue(ei.topNValue)
+       return ei.err
+}
+
+func generateTopNValuesDecoder() *encoding.BytesBlockDecoder {
+       v := topNValuesDecoderPool.Get()
+       if v == nil {
+               return &encoding.BytesBlockDecoder{}
+       }
+       return v
+}
+
+func releaseTopNValuesDecoder(d *encoding.BytesBlockDecoder) {
+       d.Reset()
+       topNValuesDecoderPool.Put(d)
+}
+
+var topNValuesDecoderPool = 
pool.Register[*encoding.BytesBlockDecoder]("topn-valueDecoder")
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index 9abe21ec..7fcd31f4 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -203,7 +203,7 @@ func (sr *schemaRepo) Watcher() {
                                                        err = 
sr.initResource(evt.Metadata.GetMetadata())
                                                case EventKindTopNAgg:
                                                        topNSchema := 
evt.Metadata.(*databasev1.TopNAggregation)
-                                                       _, err = 
createOrUpdateTopNMeasure(context.Background(), sr.metadata.MeasureRegistry(), 
topNSchema)
+                                                       err = 
createTopNResultMeasure(context.Background(), sr.metadata.MeasureRegistry(), 
topNSchema.GetMetadata().Group)
                                                        if err != nil {
                                                                break
                                                        }
@@ -217,10 +217,7 @@ func (sr *schemaRepo) Watcher() {
                                                        err = 
sr.deleteResource(evt.Metadata.GetMetadata())
                                                case EventKindTopNAgg:
                                                        topNSchema := 
evt.Metadata.(*databasev1.TopNAggregation)
-                                                       err = multierr.Combine(
-                                                               
sr.deleteResource(topNSchema.SourceMeasure),
-                                                               
sr.initResource(topNSchema.SourceMeasure),
-                                                       )
+                                                       err = 
sr.initResource(topNSchema.SourceMeasure)
                                                }
                                        }
                                        if err != nil && !errors.Is(err, 
schema.ErrClosed) {
diff --git a/pkg/schema/init.go b/pkg/schema/init.go
index a158daab..5050e4bc 100644
--- a/pkg/schema/init.go
+++ b/pkg/schema/init.go
@@ -22,9 +22,7 @@ import (
        "fmt"
        "time"
 
-       "github.com/google/go-cmp/cmp"
        "github.com/pkg/errors"
-       "google.golang.org/protobuf/testing/protocmp"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -32,16 +30,23 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-const topNTagFamily = "__topN__"
+const (
+       // TopNTagFamily is the tag family name of the topN result measure.
+       TopNTagFamily = "_topN"
+       // TopNFieldName is the field name of the topN result measure.
+       TopNFieldName = "value"
+)
 
 var (
-       initTimeout        = 10 * time.Second
-       topNValueFieldSpec = &databasev1.FieldSpec{
-               Name:              "value",
-               FieldType:         databasev1.FieldType_FIELD_TYPE_INT,
+       initTimeout    = 10 * time.Second
+       topNFieldsSpec = []*databasev1.FieldSpec{{
+               Name:              TopNFieldName,
+               FieldType:         databasev1.FieldType_FIELD_TYPE_DATA_BINARY,
                EncodingMethod:    
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
                CompressionMethod: 
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
-       }
+       }}
+       // TopNTagNames is the tag names of the topN result measure.
+       TopNTagNames = []string{"name", "direction", "group"}
 )
 
 type revisionContext struct {
@@ -154,6 +159,12 @@ func (sr *schemaRepo) getRules(ctx context.Context, gName 
string) map[string]*da
 
 func (sr *schemaRepo) processMeasure(ctx context.Context, gName string, group 
*group, bindings map[string][]string, rules map[string]*databasev1.IndexRule) {
        aggMap := sr.getAggMap(ctx, gName)
+       if len(aggMap) > 0 {
+               if err := createTopNResultMeasure(ctx, 
sr.metadata.MeasureRegistry(), gName); err != nil {
+                       logger.Panicf("fails to create the topN result measure: 
%v", err)
+                       return
+               }
+       }
 
        ctx, cancel := context.WithTimeout(ctx, initTimeout)
        defer cancel()
@@ -163,30 +174,8 @@ func (sr *schemaRepo) processMeasure(ctx context.Context, 
gName string, group *g
                logger.Panicf("fails to get the measures: %v", err)
                return
        }
-       revCtx := ctx.Value(revCtxKey).(*revisionContext)
-       measureMap := make(map[string]*databasev1.Measure, len(mm))
-       for _, m := range mm {
-               measureMap[m.Metadata.Name] = m
-       }
-       for _, aa := range aggMap {
-               for _, a := range aa {
-                       sourceMeasre, ok := measureMap[a.SourceMeasure.Name]
-                       if !ok {
-                               sr.l.Warn().Str("group", gName).Str("measure", 
a.SourceMeasure.Name).Str("agg", a.Metadata.Name).Msg("source measure not 
found")
-                               continue
-                       }
-                       if _, ok := measureMap[a.Metadata.Name]; !ok {
-                               sr.l.Info().Str("group", gName).Str("measure", 
a.SourceMeasure.Name).Str("agg", a.Metadata.Name).Msg("remove topN aggregation")
-                               m, err := createTopNMeasure(ctx, 
sr.metadata.MeasureRegistry(), a, sourceMeasre)
-                               if err != nil {
-                                       logger.Panicf("fails to create or 
update the topN measure: %v", err)
-                                       return
-                               }
-                               mm = append(mm, m)
-                       }
-               }
-       }
 
+       revCtx := ctx.Value(revCtxKey).(*revisionContext)
        for _, m := range mm {
                if m.Metadata.ModRevision > revCtx.measure {
                        revCtx.measure = m.Metadata.ModRevision
@@ -228,7 +217,9 @@ func (sr *schemaRepo) storeMeasure(m *databasev1.Measure, 
group *group, bindings
        var indexRules []*databasev1.IndexRule
        if rr, ok := bindings[m.Metadata.GetName()]; ok {
                for _, r := range rr {
-                       indexRules = append(indexRules, rules[r])
+                       if rule, ok := rules[r]; ok {
+                               indexRules = append(indexRules, rule)
+                       }
                }
        }
        var topNAggr []*databasev1.TopNAggregation
@@ -263,7 +254,9 @@ func (sr *schemaRepo) storeStream(s *databasev1.Stream, 
group *group, bindings m
        var indexRules []*databasev1.IndexRule
        if rr, ok := bindings[s.Metadata.GetName()]; ok {
                for _, r := range rr {
-                       indexRules = append(indexRules, rules[r])
+                       if rule, ok := rules[r]; ok {
+                               indexRules = append(indexRules, rule)
+                       }
                }
        }
        if err := sr.storeResource(group, s, indexRules, nil); err != nil {
@@ -284,107 +277,50 @@ func (sr *schemaRepo) initGroup(groupSchema 
*commonv1.Group) (*group, error) {
        return g, nil
 }
 
-func createOrUpdateTopNMeasure(ctx context.Context, measureSchemaRegistry 
schema.Measure, topNSchema *databasev1.TopNAggregation) (*databasev1.Measure, 
error) {
-       oldTopNSchema, err := measureSchemaRegistry.GetMeasure(ctx, 
topNSchema.GetMetadata())
+func createTopNResultMeasure(ctx context.Context, measureSchemaRegistry 
schema.Measure, group string) error {
+       md := GetTopNSchemaMetadata(group)
+       m, err := measureSchemaRegistry.GetMeasure(ctx, md)
        if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
-               return nil, errors.WithMessagef(err, "fail to get current topN 
measure %s", topNSchema.GetMetadata().GetName())
+               return errors.WithMessagef(err, "fail to get %s", md)
        }
-
-       sourceMeasureSchema, err := measureSchemaRegistry.GetMeasure(ctx, 
topNSchema.GetSourceMeasure())
-       if err != nil {
-               return nil, errors.WithMessagef(err, "fail to get source 
measure %s", topNSchema.GetSourceMeasure().GetName())
+       if m != nil {
+               return nil
        }
 
-       // create a new "derived" measure for TopN result
-       newTopNMeasure, err := buildTopNSourceMeasure(topNSchema, 
sourceMeasureSchema)
-       if err != nil {
-               return nil, err
-       }
-       if oldTopNSchema == nil {
-               if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, 
newTopNMeasure); innerErr != nil {
-                       if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) {
-                               return nil, errors.WithMessagef(innerErr, "fail 
to create new topN measure %s", newTopNMeasure.GetMetadata().GetName())
-                       }
-                       newTopNMeasure, err = 
measureSchemaRegistry.GetMeasure(ctx, topNSchema.GetMetadata())
-                       if err != nil {
-                               return nil, errors.WithMessagef(err, "fail to 
get created topN measure %s", topNSchema.GetMetadata().GetName())
-                       }
+       m = GetTopNSchema(md)
+       if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, m); innerErr 
!= nil {
+               if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) {
+                       return errors.WithMessagef(innerErr, "fail to create 
new topN measure %s", m)
                }
-               return newTopNMeasure, nil
-       }
-       // compare with the old one
-       if cmp.Diff(newTopNMeasure, oldTopNSchema,
-               protocmp.IgnoreUnknown(),
-               protocmp.IgnoreFields(&databasev1.Measure{}, "updated_at"),
-               protocmp.IgnoreFields(&commonv1.Metadata{}, "id", 
"create_revision", "mod_revision"),
-               protocmp.Transform()) == "" {
-               return oldTopNSchema, nil
-       }
-       // update
-       if _, err = measureSchemaRegistry.UpdateMeasure(ctx, newTopNMeasure); 
err != nil {
-               return nil, errors.WithMessagef(err, "fail to update topN 
measure %s", newTopNMeasure.GetMetadata().GetName())
-       }
-       return newTopNMeasure, nil
-}
-
-func createTopNMeasure(ctx context.Context, measureSchemaRegistry 
schema.Measure, topNSchema *databasev1.TopNAggregation,
-       sourceMeasureSchema *databasev1.Measure,
-) (*databasev1.Measure, error) {
-       newTopNMeasure, err := buildTopNSourceMeasure(topNSchema, 
sourceMeasureSchema)
-       if err != nil {
-               return nil, err
-       }
-       if _, err := measureSchemaRegistry.CreateMeasure(ctx, newTopNMeasure); 
err != nil {
-               return nil, err
        }
-       return newTopNMeasure, nil
+       return nil
 }
 
-func buildTopNSourceMeasure(topNSchema *databasev1.TopNAggregation, 
sourceMeasureSchema *databasev1.Measure) (*databasev1.Measure, error) {
-       tagNames := sourceMeasureSchema.GetEntity().GetTagNames()
-       seriesSpecs := make([]*databasev1.TagSpec, 0, len(tagNames))
-
-       for _, tagName := range tagNames {
-               var found bool
-               for _, fSpec := range sourceMeasureSchema.GetTagFamilies() {
-                       for _, tSpec := range fSpec.GetTags() {
-                               if tSpec.GetName() == tagName {
-                                       seriesSpecs = append(seriesSpecs, tSpec)
-                                       found = true
-                                       goto CHECK
-                               }
-                       }
-               }
-
-       CHECK:
-               if !found {
-                       return nil, fmt.Errorf("fail to find tag spec %s", 
tagName)
-               }
-       }
-       // create a new "derived" measure for TopN result
+// GetTopNSchema returns the schema of the topN result measure.
+func GetTopNSchema(md *commonv1.Metadata) *databasev1.Measure {
        return &databasev1.Measure{
-               Metadata: topNSchema.Metadata,
-               Interval: sourceMeasureSchema.GetInterval(),
+               Metadata: md,
                TagFamilies: []*databasev1.TagFamilySpec{
                        {
-                               Name: topNTagFamily,
-                               Tags: append(seriesSpecs,
-                                       &databasev1.TagSpec{
-                                               Name: "sortDirection",
-                                               Type: 
databasev1.TagType_TAG_TYPE_INT,
-                                       },
-                                       &databasev1.TagSpec{
-                                               Name: "rankNumber",
-                                               Type: 
databasev1.TagType_TAG_TYPE_INT,
-                                       },
-                               ),
+                               Name: TopNTagFamily,
+                               Tags: []*databasev1.TagSpec{
+                                       {Name: TopNTagNames[0], Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                                       {Name: TopNTagNames[1], Type: 
databasev1.TagType_TAG_TYPE_INT},
+                                       {Name: TopNTagNames[2], Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                               },
                        },
                },
-               Fields: []*databasev1.FieldSpec{topNValueFieldSpec},
+               Fields: topNFieldsSpec,
                Entity: &databasev1.Entity{
-                       TagNames: append(topNSchema.GetGroupByTagNames(),
-                               "sortDirection",
-                               "rankNumber"),
+                       TagNames: TopNTagNames,
                },
-       }, nil
+       }
+}
+
+// GetTopNSchemaMetadata returns the metadata of the topN result measure.
+func GetTopNSchemaMetadata(group string) *commonv1.Metadata {
+       return &commonv1.Metadata{
+               Name:  TopNSchemaName,
+               Group: group,
+       }
 }
diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go
index 2ac35232..6ee3c2d0 100644
--- a/pkg/schema/schema.go
+++ b/pkg/schema/schema.go
@@ -26,6 +26,9 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 )
 
+// TopNSchemaName is the name of the top n result schema.
+const TopNSchemaName = "_top_n_result"
+
 // EventType defines actions of events.
 type EventType uint8
 
diff --git a/test/cases/topn/data/want/aggr_desc.yaml 
b/test/cases/topn/data/want/aggr_desc.yaml
index b39f77e7..27895621 100644
--- a/test/cases/topn/data/want/aggr_desc.yaml
+++ b/test/cases/topn/data/want/aggr_desc.yaml
@@ -44,12 +44,11 @@ lists:
   - entity:
     - key: service_id
       value:
-        str:
-          value: ""
+        "null": null
     - key: entity_id
       value:
         str:
           value: entity_2
     value:
       int:
-        value: "10"
\ No newline at end of file
+        value: "10"

Reply via email to