This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new a69048c4 Fix TopN processor not handling data points with 
specifications correctly. (#931)
a69048c4 is described below

commit a69048c4ff85f437002edacf83918c8f8ba5951a
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Jan 9 09:25:15 2026 +0800

    Fix TopN processor not handling data points with specifications correctly. 
(#931)
---
 banyand/measure/svc_liaison.go      |  10 -
 banyand/measure/topn.go             | 227 +++++++----
 banyand/measure/topn_test.go        | 726 ++++++++++++++++++++++++++++++++++++
 banyand/measure/write_liaison.go    |   2 +-
 banyand/measure/write_standalone.go |   2 +-
 5 files changed, 884 insertions(+), 83 deletions(-)

diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go
index e9587831..453e087e 100644
--- a/banyand/measure/svc_liaison.go
+++ b/banyand/measure/svc_liaison.go
@@ -29,8 +29,6 @@ import (
        "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -166,14 +164,6 @@ func (s *liaison) GracefulStop() {
        s.schemaRepo.Close()
 }
 
-func (s *liaison) InFlow(stm *databasev1.Measure, seriesID uint64, shardID 
uint32, entityValues []*modelv1.TagValue, dp *measurev1.DataPointValue) {
-       if s.schemaRepo == nil {
-               s.l.Error().Msg("schema repository is not initialized")
-               return
-       }
-       s.schemaRepo.inFlow(stm, seriesID, shardID, entityValues, dp)
-}
-
 // NewLiaison creates a new measure liaison service with the given 
dependencies.
 func NewLiaison(metadata metadata.Repo, pipeline queue.Server, omr 
observability.MetricsRegistry, pm protector.Memory,
        dataNodeSelector node.Selector, tire2Client queue.Client,
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index f4a62abf..d595b105 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -47,7 +47,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/flow/streaming"
        "github.com/apache/skywalking-banyandb/pkg/flow/streaming/sources"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
@@ -66,13 +65,21 @@ var (
        _ flow.Sink = (*topNStreamingProcessor)(nil)
 )
 
-func (sr *schemaRepo) inFlow(stm *databasev1.Measure, seriesID uint64, shardID 
uint32, entityValues []*modelv1.TagValue, dp *measurev1.DataPointValue) {
+func (sr *schemaRepo) inFlow(
+       stm *databasev1.Measure,
+       seriesID uint64,
+       shardID uint32,
+       entityValues []*modelv1.TagValue,
+       dp *measurev1.DataPointValue,
+       spec *measurev1.DataPointSpec,
+) {
        if p, _ := sr.topNProcessorMap.Load(getKey(stm.GetMetadata())); p != 
nil {
                p.(*topNProcessorManager).onMeasureWrite(seriesID, shardID, 
&measurev1.InternalWriteRequest{
                        Request: &measurev1.WriteRequest{
-                               Metadata:  stm.GetMetadata(),
-                               DataPoint: dp,
-                               MessageId: uint64(time.Now().UnixNano()),
+                               Metadata:      stm.GetMetadata(),
+                               DataPoint:     dp,
+                               MessageId:     uint64(time.Now().UnixNano()),
+                               DataPointSpec: spec,
                        },
                        EntityValues: entityValues,
                }, stm)
@@ -136,12 +143,129 @@ func (sr *schemaRepo) stopSteamingManager(sourceMeasure 
*commonv1.Metadata) {
 }
 
 type dataPointWithEntityValues struct {
+       tagSpec logical.TagSpecRegistry
        *measurev1.DataPointValue
+       fieldIndex   map[string]int
        entityValues []*modelv1.TagValue
        seriesID     uint64
        shardID      uint32
 }
 
+func newDataPointWithEntityValues(
+       dp *measurev1.DataPointValue,
+       entityValues []*modelv1.TagValue,
+       seriesID uint64,
+       shardID uint32,
+       spec *measurev1.DataPointSpec,
+       m *databasev1.Measure,
+) *dataPointWithEntityValues {
+       fieldIndex := buildFieldIndex(spec, m)
+       tagSpec := buildTagSpecRegistryFromSpec(spec, m)
+       return &dataPointWithEntityValues{
+               DataPointValue: dp,
+               entityValues:   entityValues,
+               seriesID:       seriesID,
+               shardID:        shardID,
+               tagSpec:        tagSpec,
+               fieldIndex:     fieldIndex,
+       }
+}
+
+func buildFieldIndex(spec *measurev1.DataPointSpec, m *databasev1.Measure) 
map[string]int {
+       if spec != nil {
+               fieldIndex := make(map[string]int, len(spec.GetFieldNames()))
+               for i, fieldName := range spec.GetFieldNames() {
+                       fieldIndex[fieldName] = i
+               }
+               return fieldIndex
+       }
+       if m != nil {
+               fieldIndex := make(map[string]int, len(m.GetFields()))
+               for i, fieldSpec := range m.GetFields() {
+                       fieldIndex[fieldSpec.GetName()] = i
+               }
+               return fieldIndex
+       }
+       return make(map[string]int)
+}
+
+func buildTagSpecRegistryFromSpec(spec *measurev1.DataPointSpec, m 
*databasev1.Measure) logical.TagSpecRegistry {
+       tagSpecMap := logical.TagSpecMap{}
+       if spec != nil {
+               for specFamilyIdx, specFamily := range spec.GetTagFamilySpec() {
+                       for specTagIdx, tagName := range 
specFamily.GetTagNames() {
+                               tagSpec := &databasev1.TagSpec{
+                                       Name: tagName,
+                               }
+                               tagSpecMap.RegisterTag(specFamilyIdx, 
specTagIdx, tagSpec)
+                       }
+               }
+               return tagSpecMap
+       }
+       if m != nil {
+               for specFamilyIdx, tagFamily := range m.GetTagFamilies() {
+                       for specTagIdx, tagSpec := range tagFamily.GetTags() {
+                               tagSpecMap.RegisterTag(specFamilyIdx, 
specTagIdx, tagSpec)
+                       }
+               }
+               return tagSpecMap
+       }
+       return tagSpecMap
+}
+
+func (dp *dataPointWithEntityValues) tagValue(tagName string) 
*modelv1.TagValue {
+       if familyIdx, tagIdx, ok := dp.locateSpecTag(tagName); ok {
+               if familyIdx < len(dp.GetTagFamilies()) {
+                       tagFamily := dp.GetTagFamilies()[familyIdx]
+                       if tagIdx < len(tagFamily.GetTags()) {
+                               return tagFamily.GetTags()[tagIdx]
+                       }
+               }
+               return pbv1.NullTagValue
+       }
+       return pbv1.NullTagValue
+}
+
+func (dp *dataPointWithEntityValues) locateSpecTag(tagName string) (int, int, 
bool) {
+       if dp.tagSpec == nil {
+               return 0, 0, false
+       }
+       tagSpecFound := dp.tagSpec.FindTagSpecByName(tagName)
+       if tagSpecFound == nil {
+               return 0, 0, false
+       }
+       familyIdx := tagSpecFound.TagFamilyIdx
+       tagIdx := tagSpecFound.TagIdx
+       if familyIdx < 0 || tagIdx < 0 {
+               return 0, 0, false
+       }
+       return familyIdx, tagIdx, true
+}
+
+func (dp *dataPointWithEntityValues) intFieldValue(fieldName string, l 
*logger.Logger) int64 {
+       if dp.fieldIndex == nil {
+               return 0
+       }
+       fieldIdx, ok := dp.fieldIndex[fieldName]
+       if !ok {
+               return 0
+       }
+       if fieldIdx >= len(dp.GetFields()) {
+               if l != nil {
+                       l.Warn().Str("fieldName", fieldName).
+                               Int("len", len(dp.GetFields())).
+                               Int("fieldIdx", fieldIdx).
+                               Msg("field index out of range")
+               }
+               return 0
+       }
+       field := dp.GetFields()[fieldIdx]
+       if field.GetInt() == nil {
+               return 0
+       }
+       return field.GetInt().GetValue()
+}
+
 type topNStreamingProcessor struct {
        pipeline      queue.Client
        streamingFlow flow.Flow
@@ -350,7 +474,6 @@ func (t *topNStreamingProcessor) handleError() {
 // topNProcessorManager manages multiple topNStreamingProcessor(s) belonging 
to a single measure.
 type topNProcessorManager struct {
        pipeline        queue.Client
-       s               logical.TagSpecRegistry
        l               *logger.Logger
        m               *databasev1.Measure
        nodeID          string
@@ -370,9 +493,6 @@ func (manager *topNProcessorManager) init(m 
*databasev1.Measure) {
                return
        }
        manager.m = m
-       tagMapSpec := logical.TagSpecMap{}
-       tagMapSpec.RegisterTagFamilies(m.GetTagFamilies())
-       manager.s = tagMapSpec
        for i := range manager.registeredTasks {
                if err := manager.start(manager.registeredTasks[i]); err != nil 
{
                        manager.l.Err(err).Msg("fail to start processor")
@@ -393,7 +513,6 @@ func (manager *topNProcessorManager) Close() error {
        }
        manager.processorList = nil
        manager.registeredTasks = nil
-       manager.s = nil
        manager.m = nil
        return err
 }
@@ -410,13 +529,18 @@ func (manager *topNProcessorManager) 
onMeasureWrite(seriesID uint64, shardID uin
                        manager.init(measure)
                        manager.RLock()
                }
+               dp := request.GetRequest().GetDataPoint()
+               spec := request.GetRequest().GetDataPointSpec()
                for _, processor := range manager.processorList {
-                       processor.src <- 
flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{
-                               request.GetRequest().GetDataPoint(),
+                       dpWithEntity := newDataPointWithEntityValues(
+                               dp,
                                request.GetEntityValues(),
                                seriesID,
                                shardID,
-                       }, request.GetRequest().GetDataPoint().GetTimestamp())
+                               spec,
+                               manager.m,
+                       )
+                       processor.src <- 
flow.NewStreamRecordWithTimestampPb(dpWithEntity, dp.GetTimestamp())
                }
        }()
 }
@@ -537,7 +661,8 @@ func (manager *topNProcessorManager) buildFilter(criteria 
*modelv1.Criteria) (fl
 
        return func(_ context.Context, request any) bool {
                tffws := request.(*dataPointWithEntityValues).GetTagFamilies()
-               ok, matchErr := f.Match(logical.TagFamiliesForWrite(tffws), 
manager.s)
+               tagSpec := request.(*dataPointWithEntityValues).tagSpec
+               ok, matchErr := f.Match(logical.TagFamiliesForWrite(tffws), 
tagSpec)
                if matchErr != nil {
                        manager.l.Err(matchErr).Msg("fail to match criteria")
                        return false
@@ -558,30 +683,24 @@ func (manager *topNProcessorManager) 
buildMapper(fieldName string, groupByNames
        if len(groupByNames) == 0 {
                return func(_ context.Context, request any) any {
                        dpWithEvs := request.(*dataPointWithEntityValues)
-                       if len(dpWithEvs.GetFields()) <= fieldIdx {
-                               manager.l.Warn().Interface("point", 
dpWithEvs.DataPointValue).
-                                       Str("fieldName", fieldName).
-                                       Int("len", len(dpWithEvs.GetFields())).
-                                       Int("fieldIdx", fieldIdx).
-                                       Msg("out of range")
-                       }
                        return flow.Data{
-                               // EntityValues as identity
                                dpWithEvs.entityValues,
-                               // save string representation of group values 
as the key, i.e. v1
                                "",
-                               // field value as v2
-                               
dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
-                               // shardID values as v3
+                               dpWithEvs.intFieldValue(fieldName, manager.l),
                                dpWithEvs.shardID,
-                               // seriesID values as v4
                                dpWithEvs.seriesID,
                        }
                }, nil
        }
-       groupLocator, removedTags := newGroupLocator(manager.m, groupByNames)
+       var removedTags []string
+       for i := range groupByNames {
+               _, _, tagSpec := pbv1.FindTagByName(manager.m.GetTagFamilies(), 
groupByNames[i])
+               if tagSpec == nil {
+                       removedTags = append(removedTags, groupByNames[i])
+               }
+       }
        if len(removedTags) > 0 {
-               if len(groupLocator) == 0 {
+               if len(removedTags) == len(groupByNames) {
                        manager.l.Warn().Strs("removedTags", 
removedTags).Str("measure", manager.m.Metadata.GetName()).
                                Msg("TopNAggregation references removed tags 
which no longer exist in schema, all groupBy tags were removed")
                        return nil, fmt.Errorf("all groupBy tags [%s] no longer 
exist in %s schema, TopNAggregation is invalid",
@@ -592,55 +711,21 @@ func (manager *topNProcessorManager) 
buildMapper(fieldName string, groupByNames
        }
        return func(_ context.Context, request any) any {
                dpWithEvs := request.(*dataPointWithEntityValues)
+               groupValues := make([]string, 0, len(groupByNames))
+               for i := range groupByNames {
+                       tagValue := dpWithEvs.tagValue(groupByNames[i])
+                       groupValues = append(groupValues, Stringify(tagValue))
+               }
                return flow.Data{
-                       // EntityValues as identity
                        dpWithEvs.entityValues,
-                       // save string representation of group values as the 
key, i.e. v1
-                       GroupName(transform(groupLocator, func(locator 
partition.TagLocator) string {
-                               return 
Stringify(extractTagValue(dpWithEvs.DataPointValue, locator))
-                       })),
-                       // field value as v2
-                       dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
-                       // shardID values as v3
+                       GroupName(groupValues),
+                       dpWithEvs.intFieldValue(fieldName, manager.l),
                        dpWithEvs.shardID,
-                       // seriesID values as v4
                        dpWithEvs.seriesID,
                }
        }, nil
 }
 
-// groupTagsLocator can be used to locate tags within families.
-type groupTagsLocator []partition.TagLocator
-
-// newGroupLocator generates a groupTagsLocator which strictly preserve the 
order of groupByNames.
-func newGroupLocator(m *databasev1.Measure, groupByNames []string) 
(groupTagsLocator, []string) {
-       groupTags := make([]partition.TagLocator, 0, len(groupByNames))
-       var removedTags []string
-       for _, groupByName := range groupByNames {
-               fIdx, tIdx, spec := pbv1.FindTagByName(m.GetTagFamilies(), 
groupByName)
-               if spec == nil {
-                       removedTags = append(removedTags, groupByName)
-                       continue
-               }
-               groupTags = append(groupTags, partition.TagLocator{
-                       FamilyOffset: fIdx,
-                       TagOffset:    tIdx,
-               })
-       }
-       return groupTags, removedTags
-}
-
-func extractTagValue(dpv *measurev1.DataPointValue, locator 
partition.TagLocator) *modelv1.TagValue {
-       if locator.FamilyOffset >= len(dpv.GetTagFamilies()) {
-               return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
-       }
-       tagFamily := dpv.GetTagFamilies()[locator.FamilyOffset]
-       if locator.TagOffset >= len(tagFamily.GetTags()) {
-               return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
-       }
-       return tagFamily.GetTags()[locator.TagOffset]
-}
-
 // Stringify converts a TagValue to a string.
 func Stringify(tagValue *modelv1.TagValue) string {
        switch v := tagValue.GetValue().(type) {
@@ -741,7 +826,7 @@ func (t *TopNValue) resizeEntityValues(size int) [][]byte {
        return t.entityValues
 }
 
-func (t *TopNValue) resizeEntities(size int, entitySize int) 
[][]*modelv1.TagValue {
+func (t *TopNValue) resizeEntities(size, entitySize int) [][]*modelv1.TagValue 
{
        entities := t.entities
        if n := size - cap(t.entities); n > 0 {
                entities = append(entities[:cap(entities)], 
make([][]*modelv1.TagValue, n)...)
diff --git a/banyand/measure/topn_test.go b/banyand/measure/topn_test.go
index 385d39c5..95c0deaf 100644
--- a/banyand/measure/topn_test.go
+++ b/banyand/measure/topn_test.go
@@ -18,14 +18,20 @@
 package measure
 
 import (
+       "encoding/base64"
        "testing"
 
        "github.com/google/go-cmp/cmp"
        "github.com/stretchr/testify/require"
        "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/testing/protocmp"
+       "google.golang.org/protobuf/types/known/timestamppb"
 
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
 func TestTopNValue_MarshalUnmarshal(t *testing.T) {
@@ -112,3 +118,723 @@ func TestTopNValue_MarshalUnmarshal(t *testing.T) {
                })
        }
 }
+
+func TestTopNValue_Marshal_EmptyValues(t *testing.T) {
+       topNVal := &TopNValue{
+               valueName:      "testValue",
+               entityTagNames: []string{"tag1"},
+               values:         []int64{},
+               entities:       [][]*modelv1.TagValue{},
+       }
+       _, err := topNVal.marshal(nil)
+       require.Error(t, err)
+       require.Contains(t, err.Error(), "values is empty")
+}
+
+func TestTopNValue_Unmarshal_InvalidData(t *testing.T) {
+       decoder := generateColumnValuesDecoder()
+       defer releaseColumnValuesDecoder(decoder)
+
+       tests := []struct {
+               name    string
+               wantErr string
+               src     []byte
+       }{
+               {
+                       name:    "empty src",
+                       src:     []byte{},
+                       wantErr: "cannot unmarshal topNValue",
+               },
+               {
+                       name:    "truncated after name",
+                       src:     []byte{0x01, 0x01, 'a'},
+                       wantErr: "cannot unmarshal topNValue",
+               },
+               {
+                       name:    "truncated after encodeType",
+                       src:     []byte{0x01, 0x01, 'a', 0x00, 0x00},
+                       wantErr: "cannot unmarshal topNValue",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       topNVal := &TopNValue{}
+                       err := topNVal.Unmarshal(tt.src, decoder)
+                       require.Error(t, err)
+                       require.Contains(t, err.Error(), tt.wantErr)
+               })
+       }
+}
+
+func TestTopNValue_SetMetadata(t *testing.T) {
+       topNVal := &TopNValue{}
+       topNVal.setMetadata("testValue", []string{"tag1", "tag2"})
+       require.Equal(t, "testValue", topNVal.valueName)
+       require.Equal(t, []string{"tag1", "tag2"}, topNVal.entityTagNames)
+}
+
+func TestTopNValue_AddValue(t *testing.T) {
+       topNVal := &TopNValue{}
+       entityValues := []*modelv1.TagValue{
+               {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"svc1"}}},
+               {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity1"}}},
+       }
+       topNVal.addValue(100, entityValues)
+       require.Equal(t, []int64{100}, topNVal.values)
+       require.Len(t, topNVal.entities, 1)
+       require.Len(t, topNVal.entities[0], 2)
+       require.Equal(t, entityValues, topNVal.entities[0], "entityValues 
should be copied and equal")
+       require.NotSame(t, &entityValues[0], &topNVal.entities[0][0], 
"entityValues should be a copy, not the same slice")
+}
+
+func TestTopNValue_Values(t *testing.T) {
+       topNVal := &TopNValue{
+               valueName:      "testValue",
+               entityTagNames: []string{"tag1", "tag2"},
+               values:         []int64{1, 2, 3},
+               entities: [][]*modelv1.TagValue{
+                       {
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc1"}}},
+                       },
+               },
+       }
+       valueName, entityTagNames, values, entities := topNVal.Values()
+       require.Equal(t, "testValue", valueName)
+       require.Equal(t, []string{"tag1", "tag2"}, entityTagNames)
+       require.Equal(t, []int64{1, 2, 3}, values)
+       require.Equal(t, topNVal.entities, entities)
+}
+
+func TestTopNValue_Reset(t *testing.T) {
+       topNVal := &TopNValue{
+               valueName:      "testValue",
+               entityTagNames: []string{"tag1", "tag2"},
+               values:         []int64{1, 2, 3},
+               entities: [][]*modelv1.TagValue{
+                       {
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc1"}}},
+                       },
+               },
+               buf:             []byte{1, 2, 3},
+               entityValues:    [][]byte{{1, 2}},
+               entityValuesBuf: [][]byte{{3, 4}},
+       }
+       topNVal.Reset()
+       require.Empty(t, topNVal.valueName)
+       require.Empty(t, topNVal.entityTagNames)
+       require.Empty(t, topNVal.values)
+       require.Empty(t, topNVal.entities)
+       require.Empty(t, topNVal.buf)
+       require.Empty(t, topNVal.entityValues)
+       require.Empty(t, topNVal.entityValuesBuf)
+       require.Equal(t, int64(0), topNVal.firstValue)
+}
+
+func TestTopNValue_ResizeEntityValues(t *testing.T) {
+       topNVal := &TopNValue{}
+       result := topNVal.resizeEntityValues(5)
+       require.Len(t, result, 5)
+       require.Len(t, topNVal.entityValues, 5)
+       result2 := topNVal.resizeEntityValues(3)
+       require.Len(t, result2, 3)
+       require.Len(t, topNVal.entityValues, 3)
+       result3 := topNVal.resizeEntityValues(10)
+       require.Len(t, result3, 10)
+       require.Len(t, topNVal.entityValues, 10)
+}
+
+func TestTopNValue_ResizeEntities(t *testing.T) {
+       topNVal := &TopNValue{}
+       result := topNVal.resizeEntities(3, 2)
+       require.Len(t, result, 3)
+       require.Len(t, topNVal.entities, 3)
+       for i := range result {
+               require.Len(t, result[i], 0, "entities should be reset to 
length 0")
+               require.Equal(t, 2, cap(result[i]), "entities should have 
capacity 2")
+       }
+}
+
+func TestDataPointWithEntityValues_IntFieldValue(t *testing.T) {
+       tests := []struct {
+               fieldIndex    map[string]int
+               name          string
+               fieldName     string
+               fields        []*modelv1.FieldValue
+               expectedValue int64
+       }{
+               {
+                       name:          "field exists",
+                       fieldIndex:    map[string]int{"field1": 0},
+                       fields:        []*modelv1.FieldValue{{Value: 
&modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 100}}}},
+                       fieldName:     "field1",
+                       expectedValue: 100,
+               },
+               {
+                       name:          "field not in index",
+                       fieldIndex:    map[string]int{"field1": 0},
+                       fields:        []*modelv1.FieldValue{{Value: 
&modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 100}}}},
+                       fieldName:     "field2",
+                       expectedValue: 0,
+               },
+               {
+                       name:          "field index out of range",
+                       fieldIndex:    map[string]int{"field1": 5},
+                       fields:        []*modelv1.FieldValue{{Value: 
&modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 100}}}},
+                       fieldName:     "field1",
+                       expectedValue: 0,
+               },
+               {
+                       name:          "field is not int type",
+                       fieldIndex:    map[string]int{"field1": 0},
+                       fields:        []*modelv1.FieldValue{{Value: 
&modelv1.FieldValue_Float{Float: &modelv1.Float{Value: 3.14}}}},
+                       fieldName:     "field1",
+                       expectedValue: 0,
+               },
+               {
+                       name:          "nil fieldIndex",
+                       fieldIndex:    nil,
+                       fields:        []*modelv1.FieldValue{{Value: 
&modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 100}}}},
+                       fieldName:     "field1",
+                       expectedValue: 0,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       dp := &dataPointWithEntityValues{
+                               DataPointValue: &measurev1.DataPointValue{
+                                       Fields: tt.fields,
+                               },
+                               fieldIndex: tt.fieldIndex,
+                       }
+                       result := dp.intFieldValue(tt.fieldName, nil)
+                       require.Equal(t, tt.expectedValue, result)
+               })
+       }
+}
+
+func TestDataPointWithEntityValues_TagValue(t *testing.T) {
+       tests := []struct {
+               tagSpec      logical.TagSpecRegistry
+               name         string
+               tagName      string
+               tagFamilies  []*modelv1.TagFamilyForWrite
+               expectedNull bool
+       }{
+               {
+                       name: "tag found",
+                       tagSpec: func() logical.TagSpecRegistry {
+                               tagSpecMap := logical.TagSpecMap{}
+                               tagSpecMap.RegisterTag(0, 0, 
&databasev1.TagSpec{Name: "tag1"})
+                               return tagSpecMap
+                       }(),
+                       tagFamilies: []*modelv1.TagFamilyForWrite{
+                               {
+                                       Tags: []*modelv1.TagValue{
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "value1"}}},
+                                       },
+                               },
+                       },
+                       tagName:      "tag1",
+                       expectedNull: false,
+               },
+               {
+                       name:         "tag not found in spec",
+                       tagSpec:      logical.TagSpecMap{},
+                       tagFamilies:  []*modelv1.TagFamilyForWrite{},
+                       tagName:      "tag1",
+                       expectedNull: true,
+               },
+               {
+                       name: "tag family index out of range",
+                       tagSpec: func() logical.TagSpecRegistry {
+                               tagSpecMap := logical.TagSpecMap{}
+                               tagSpecMap.RegisterTag(5, 0, 
&databasev1.TagSpec{Name: "tag1"})
+                               return tagSpecMap
+                       }(),
+                       tagFamilies: []*modelv1.TagFamilyForWrite{
+                               {
+                                       Tags: []*modelv1.TagValue{
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "value1"}}},
+                                       },
+                               },
+                       },
+                       tagName:      "tag1",
+                       expectedNull: true,
+               },
+               {
+                       name: "tag index out of range",
+                       tagSpec: func() logical.TagSpecRegistry {
+                               tagSpecMap := logical.TagSpecMap{}
+                               tagSpecMap.RegisterTag(0, 5, 
&databasev1.TagSpec{Name: "tag1"})
+                               return tagSpecMap
+                       }(),
+                       tagFamilies: []*modelv1.TagFamilyForWrite{
+                               {
+                                       Tags: []*modelv1.TagValue{
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "value1"}}},
+                                       },
+                               },
+                       },
+                       tagName:      "tag1",
+                       expectedNull: true,
+               },
+               {
+                       name:         "nil tagSpec",
+                       tagSpec:      nil,
+                       tagFamilies:  []*modelv1.TagFamilyForWrite{},
+                       tagName:      "tag1",
+                       expectedNull: true,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       dp := &dataPointWithEntityValues{
+                               DataPointValue: &measurev1.DataPointValue{
+                                       TagFamilies: tt.tagFamilies,
+                               },
+                               tagSpec: tt.tagSpec,
+                       }
+                       result := dp.tagValue(tt.tagName)
+                       if tt.expectedNull {
+                               require.Equal(t, pbv1.NullTagValue, result)
+                       } else {
+                               require.NotEqual(t, pbv1.NullTagValue, result)
+                       }
+               })
+       }
+}
+
+func TestDataPointWithEntityValues_LocateSpecTag(t *testing.T) {
+       tests := []struct {
+               name       string
+               tagSpec    logical.TagSpecRegistry
+               tagName    string
+               wantFamily int
+               wantTag    int
+               wantOk     bool
+       }{
+               {
+                       name: "tag found",
+                       tagSpec: func() logical.TagSpecRegistry {
+                               tagSpecMap := logical.TagSpecMap{}
+                               tagSpecMap.RegisterTag(1, 2, 
&databasev1.TagSpec{Name: "tag1"})
+                               return tagSpecMap
+                       }(),
+                       tagName:    "tag1",
+                       wantFamily: 1,
+                       wantTag:    2,
+                       wantOk:     true,
+               },
+               {
+                       name:       "tag not found",
+                       tagSpec:    logical.TagSpecMap{},
+                       tagName:    "tag1",
+                       wantFamily: 0,
+                       wantTag:    0,
+                       wantOk:     false,
+               },
+               {
+                       name:       "nil tagSpec",
+                       tagSpec:    nil,
+                       tagName:    "tag1",
+                       wantFamily: 0,
+                       wantTag:    0,
+                       wantOk:     false,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       dp := &dataPointWithEntityValues{
+                               tagSpec: tt.tagSpec,
+                       }
+                       familyIdx, tagIdx, ok := dp.locateSpecTag(tt.tagName)
+                       require.Equal(t, tt.wantOk, ok)
+                       if ok {
+                               require.Equal(t, tt.wantFamily, familyIdx)
+                               require.Equal(t, tt.wantTag, tagIdx)
+                       }
+               })
+       }
+}
+
+func TestStringify(t *testing.T) {
+       tests := []struct {
+               name     string
+               tagValue *modelv1.TagValue
+               expected string
+       }{
+               {
+                       name:     "string value",
+                       tagValue: &modelv1.TagValue{Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "test"}}},
+                       expected: "test",
+               },
+               {
+                       name:     "int value",
+                       tagValue: &modelv1.TagValue{Value: 
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: 12345}}},
+                       expected: "12345",
+               },
+               {
+                       name:     "binary data",
+                       tagValue: &modelv1.TagValue{Value: 
&modelv1.TagValue_BinaryData{BinaryData: []byte("test data")}},
+                       expected: 
base64.StdEncoding.EncodeToString([]byte("test data")),
+               },
+               {
+                       name:     "int array",
+                       tagValue: &modelv1.TagValue{Value: 
&modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: []int64{1, 2, 
3}}}},
+                       expected: "1,2,3",
+               },
+               {
+                       name:     "string array",
+                       tagValue: &modelv1.TagValue{Value: 
&modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: []string{"a", 
"b", "c"}}}},
+                       expected: "a,b,c",
+               },
+               {
+                       name:     "empty string array",
+                       tagValue: &modelv1.TagValue{Value: 
&modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: []string{}}}},
+                       expected: "",
+               },
+               {
+                       name:     "unknown type",
+                       tagValue: &modelv1.TagValue{Value: 
&modelv1.TagValue_Null{}},
+                       expected: "",
+               },
+               {
+                       name:     "nil value",
+                       tagValue: &modelv1.TagValue{},
+                       expected: "",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := Stringify(tt.tagValue)
+                       require.Equal(t, tt.expected, result)
+               })
+       }
+}
+
+func TestGroupName(t *testing.T) {
+       tests := []struct {
+               name      string
+               expected  string
+               groupTags []string
+       }{
+               {
+                       name:      "single tag",
+                       groupTags: []string{"tag1"},
+                       expected:  "tag1",
+               },
+               {
+                       name:      "multiple tags",
+                       groupTags: []string{"tag1", "tag2", "tag3"},
+                       expected:  "tag1|tag2|tag3",
+               },
+               {
+                       name:      "empty tags",
+                       groupTags: []string{},
+                       expected:  "",
+               },
+               {
+                       name:      "nil tags",
+                       groupTags: nil,
+                       expected:  "",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := GroupName(tt.groupTags)
+                       require.Equal(t, tt.expected, result)
+               })
+       }
+}
+
+func TestBuildFieldIndex(t *testing.T) {
+       tests := []struct {
+               spec     *measurev1.DataPointSpec
+               expected map[string]int
+               name     string
+       }{
+               {
+                       name: "multiple fields",
+                       spec: &measurev1.DataPointSpec{
+                               FieldNames: []string{"field1", "field2", 
"field3"},
+                       },
+                       expected: map[string]int{
+                               "field1": 0,
+                               "field2": 1,
+                               "field3": 2,
+                       },
+               },
+               {
+                       name:     "empty fields",
+                       spec:     &measurev1.DataPointSpec{FieldNames: 
[]string{}},
+                       expected: map[string]int{},
+               },
+               {
+                       name:     "nil spec",
+                       spec:     nil,
+                       expected: map[string]int{},
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := buildFieldIndex(tt.spec, nil)
+                       require.Equal(t, tt.expected, result)
+               })
+       }
+}
+
+func TestBuildTagSpecRegistryFromSpec(t *testing.T) {
+       tests := []struct {
+               name    string
+               spec    *measurev1.DataPointSpec
+               tagName string
+               wantOk  bool
+               wantFam int
+               wantTag int
+       }{
+               {
+                       name: "single tag family",
+                       spec: &measurev1.DataPointSpec{
+                               TagFamilySpec: []*measurev1.TagFamilySpec{
+                                       {
+                                               TagNames: []string{"tag1", 
"tag2"},
+                                       },
+                               },
+                       },
+                       tagName: "tag1",
+                       wantOk:  true,
+                       wantFam: 0,
+                       wantTag: 0,
+               },
+               {
+                       name: "multiple tag families",
+                       spec: &measurev1.DataPointSpec{
+                               TagFamilySpec: []*measurev1.TagFamilySpec{
+                                       {
+                                               TagNames: []string{"tag1"},
+                                       },
+                                       {
+                                               TagNames: []string{"tag2", 
"tag3"},
+                                       },
+                               },
+                       },
+                       tagName: "tag3",
+                       wantOk:  true,
+                       wantFam: 1,
+                       wantTag: 1,
+               },
+               {
+                       name:    "nil spec",
+                       spec:    nil,
+                       tagName: "tag1",
+                       wantOk:  false,
+                       wantFam: 0,
+                       wantTag: 0,
+               },
+               {
+                       name: "tag not found",
+                       spec: &measurev1.DataPointSpec{
+                               TagFamilySpec: []*measurev1.TagFamilySpec{
+                                       {
+                                               TagNames: []string{"tag1"},
+                                       },
+                               },
+                       },
+                       tagName: "tag2",
+                       wantOk:  false,
+                       wantFam: 0,
+                       wantTag: 0,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := buildTagSpecRegistryFromSpec(tt.spec, nil)
+                       if tt.spec == nil {
+                               require.NotNil(t, result)
+                               return
+                       }
+                       tagSpec := result.FindTagSpecByName(tt.tagName)
+                       if tt.wantOk {
+                               require.NotNil(t, tagSpec)
+                               require.Equal(t, tt.wantFam, 
tagSpec.TagFamilyIdx)
+                               require.Equal(t, tt.wantTag, tagSpec.TagIdx)
+                       } else {
+                               require.Nil(t, tagSpec)
+                       }
+               })
+       }
+}
+
+func TestTopNValue_MarshalUnmarshal_EdgeCases(t *testing.T) {
+       decoder := generateColumnValuesDecoder()
+       defer releaseColumnValuesDecoder(decoder)
+
+       tests := []struct {
+               topNVal *TopNValue
+               name    string
+       }{
+               {
+                       name: "empty entityTagNames",
+                       topNVal: &TopNValue{
+                               valueName:      "testValue",
+                               entityTagNames: []string{},
+                               values:         []int64{1, 2, 3},
+                               entities: [][]*modelv1.TagValue{
+                                       {},
+                                       {},
+                                       {},
+                               },
+                       },
+               },
+               {
+                       name: "large values",
+                       topNVal: &TopNValue{
+                               valueName:      "testValue",
+                               entityTagNames: []string{"tag1"},
+                               values:         []int64{-9223372036854775808, 
9223372036854775807, 0},
+                               entities: [][]*modelv1.TagValue{
+                                       {{Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: 1}}}},
+                                       {{Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: 2}}}},
+                                       {{Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: 3}}}},
+                               },
+                       },
+               },
+               {
+                       name: "many entity tag names",
+                       topNVal: &TopNValue{
+                               valueName:      "testValue",
+                               entityTagNames: []string{"tag1", "tag2", 
"tag3", "tag4", "tag5"},
+                               values:         []int64{1},
+                               entities: [][]*modelv1.TagValue{
+                                       {
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v1"}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v2"}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v3"}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v4"}}},
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "v5"}}},
+                                       },
+                               },
+                       },
+               },
+               {
+                       name: "different tag value types",
+                       topNVal: &TopNValue{
+                               valueName:      "testValue",
+                               entityTagNames: []string{"tag1", "tag2", 
"tag3"},
+                               values:         []int64{1, 2},
+                               entities: [][]*modelv1.TagValue{
+                                       {
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "str"}}},
+                                               {Value: 
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: 42}}},
+                                               {Value: 
&modelv1.TagValue_BinaryData{BinaryData: []byte("binary")}},
+                                       },
+                                       {
+                                               {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "str2"}}},
+                                               {Value: 
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: 43}}},
+                                               {Value: 
&modelv1.TagValue_BinaryData{BinaryData: []byte("binary2")}},
+                                       },
+                               },
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       originalValueName := tt.topNVal.valueName
+                       originalEntityTagNames := make([]string, 
len(tt.topNVal.entityTagNames))
+                       copy(originalEntityTagNames, tt.topNVal.entityTagNames)
+                       originalValues := make([]int64, len(tt.topNVal.values))
+                       copy(originalValues, tt.topNVal.values)
+
+                       originalEntities := make([][]*modelv1.TagValue, 
len(tt.topNVal.entities))
+                       for i, entityGroup := range tt.topNVal.entities {
+                               originalEntities[i] = make([]*modelv1.TagValue, 
len(entityGroup))
+                               for j, tagValue := range entityGroup {
+                                       originalEntities[i][j] = 
proto.Clone(tagValue).(*modelv1.TagValue)
+                               }
+                       }
+
+                       dst, err := tt.topNVal.marshal(nil)
+                       require.NoError(t, err)
+
+                       tt.topNVal.Reset()
+                       err = tt.topNVal.Unmarshal(dst, decoder)
+                       require.NoError(t, err)
+
+                       require.Equal(t, originalValueName, 
tt.topNVal.valueName)
+                       require.Equal(t, originalEntityTagNames, 
tt.topNVal.entityTagNames)
+                       require.Equal(t, originalValues, tt.topNVal.values)
+                       diff := cmp.Diff(originalEntities, tt.topNVal.entities, 
protocmp.Transform())
+                       require.True(t, diff == "", "entities differ: %s", diff)
+               })
+       }
+}
+
+func TestTopNValue_Unmarshal_InvalidEntityLength(t *testing.T) {
+       decoder := generateColumnValuesDecoder()
+       defer releaseColumnValuesDecoder(decoder)
+
+       topNVal := &TopNValue{
+               valueName:      "testValue",
+               entityTagNames: []string{"tag1", "tag2"},
+               values:         []int64{1},
+               entities: [][]*modelv1.TagValue{
+                       {
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "svc1"}}},
+                       },
+               },
+       }
+
+       dst, err := topNVal.marshal(nil)
+       require.NoError(t, err)
+
+       topNVal.Reset()
+       err = topNVal.Unmarshal(dst, decoder)
+       require.Error(t, err)
+       require.Contains(t, err.Error(), "entityValues")
+}
+
+func TestNewDataPointWithEntityValues(t *testing.T) {
+       dp := &measurev1.DataPointValue{
+               Timestamp: timestamppb.Now(),
+               Fields: []*modelv1.FieldValue{
+                       {Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: 100}}},
+               },
+               TagFamilies: []*modelv1.TagFamilyForWrite{
+                       {
+                               Tags: []*modelv1.TagValue{
+                                       {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "tag1"}}},
+                               },
+                       },
+               },
+       }
+       entityValues := []*modelv1.TagValue{
+               {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity1"}}},
+       }
+       spec := &measurev1.DataPointSpec{
+               FieldNames: []string{"field1"},
+               TagFamilySpec: []*measurev1.TagFamilySpec{
+                       {
+                               TagNames: []string{"tag1"},
+                       },
+               },
+       }
+
+       result := newDataPointWithEntityValues(dp, entityValues, 123, 456, 
spec, nil)
+       require.NotNil(t, result)
+       require.Equal(t, dp, result.DataPointValue)
+       require.Equal(t, entityValues, result.entityValues)
+       require.Equal(t, uint64(123), result.seriesID)
+       require.Equal(t, uint32(456), result.shardID)
+       require.NotNil(t, result.fieldIndex)
+       require.Equal(t, 0, result.fieldIndex["field1"])
+       require.NotNil(t, result.tagSpec)
+}
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index 33cd94f5..4f855b86 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -238,6 +238,6 @@ func (w *writeQueueCallback) handle(dst 
map[string]*dataPointsInQueue,
        if err != nil {
                return nil, err
        }
-       w.schemaRepo.inFlow(stm.GetSchema(), sid, writeEvent.ShardId, 
writeEvent.EntityValues, req.DataPoint)
+       w.schemaRepo.inFlow(stm.GetSchema(), sid, writeEvent.ShardId, 
writeEvent.EntityValues, req.DataPoint, spec)
        return dst, nil
 }
diff --git a/banyand/measure/write_standalone.go 
b/banyand/measure/write_standalone.go
index 44872e3e..08c8b380 100644
--- a/banyand/measure/write_standalone.go
+++ b/banyand/measure/write_standalone.go
@@ -199,7 +199,7 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
        if err != nil {
                return nil, err
        }
-       w.schemaRepo.inFlow(stm.GetSchema(), sid, writeEvent.ShardId, 
writeEvent.EntityValues, req.DataPoint)
+       w.schemaRepo.inFlow(stm.GetSchema(), sid, writeEvent.ShardId, 
writeEvent.EntityValues, req.DataPoint, spec)
        return dst, nil
 }
 


Reply via email to