This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch topn in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 8b1186b8f6e14a48d0d7fef85ba0bf36642658b2 Author: Gao Hongtao <[email protected]> AuthorDate: Wed Dec 11 07:42:27 2024 +0800 Fix top-n high cardinality Signed-off-by: Gao Hongtao <[email protected]> --- CHANGES.md | 1 + banyand/measure/topn.go | 419 ++++++++++++++++------- banyand/measure/topn_test.go | 91 +++++ banyand/measure/write.go | 2 +- banyand/query/processor_topn.go | 22 +- pkg/partition/index.go | 3 + pkg/pb/v1/series.go | 7 +- pkg/pb/v1/value.go | 33 ++ pkg/query/logical/measure/topn_analyzer.go | 132 +++---- pkg/query/logical/measure/topn_plan_localscan.go | 275 +++++++++------ pkg/schema/cache.go | 7 +- pkg/schema/init.go | 178 +++------- pkg/schema/schema.go | 3 + test/cases/topn/data/want/aggr_desc.yaml | 5 +- 14 files changed, 746 insertions(+), 432 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index cc1ca646..c3b8cf1e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,7 @@ Release Notes. - Index: Support InsertIfAbsent functionality which ensures documents are only inserted if their docIDs are not already present in the current index. There is a exception for the documents with extra index fields more than the entity's index fields. - Measure: Introduce "index_mode" to save data exclusively in the series index, ideal for non-timeseries measures. - Index: Use numeric index type to support Int and Float +- TopN: Group top n pre-calculation result by the group key in the new introduced `_top_n_result` measure, which is used to store the pre-calculation result. ### Bug Fixes diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go index c64270aa..c7649781 100644 --- a/banyand/measure/topn.go +++ b/banyand/measure/topn.go @@ -18,6 +18,7 @@ package measure import ( + "bytes" "context" "encoding/base64" "fmt" @@ -32,7 +33,6 @@ import ( "golang.org/x/exp/slices" "google.golang.org/protobuf/types/known/timestamppb" - "github.com/apache/skywalking-banyandb/api/common" apiData "github.com/apache/skywalking-banyandb/api/data" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" @@ -40,13 +40,17 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/flow" "github.com/apache/skywalking-banyandb/pkg/flow/streaming" "github.com/apache/skywalking-banyandb/pkg/flow/streaming/sources" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/logical" + "github.com/apache/skywalking-banyandb/pkg/schema" ) const ( @@ -65,18 +69,20 @@ type dataPointWithEntityValues struct { *measurev1.DataPointValue entityValues []*modelv1.TagValue seriesID uint64 + shardID uint32 } type topNStreamingProcessor struct { - m *measure + pipeline queue.Queue streamingFlow flow.Flow + in chan flow.StreamRecord l *logger.Logger - pipeline queue.Queue topNSchema *databasev1.TopNAggregation src chan interface{} - in chan flow.StreamRecord + m *measure errCh <-chan error stopCh chan struct{} + buf []byte flow.ComponentState interval time.Duration sortDirection modelv1.Sort @@ -137,134 +143,93 @@ func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) err var err error publisher := t.pipeline.NewBatchPublisher(resultPersistencyTimeout) defer publisher.Close() + topNValue := GenerateTopNValue() + defer ReleaseTopNValue(topNValue) for group, tuples := range tuplesGroups { if e := t.l.Debug(); e.Enabled() { - e.Str("TopN", t.topNSchema.GetMetadata().GetName()). - Str("group", group). - Int("rankNums", len(tuples)). - Msg("Write tuples") + for i := range tuples { + tuple := tuples[i] + data := tuple.V2.(flow.StreamRecord).Data().(flow.Data) + e. + Int("rankNums", i+1). + Str("entityValues", fmt.Sprintf("%v", data[0])). + Int("value", int(data[2].(int64))). + Time("eventTime", eventTime). + Msgf("Write tuples %s %s", t.topNSchema.GetMetadata().GetName(), group) + } } - for rankNum, tuple := range tuples { - fieldValue := tuple.V1.(int64) + topNValue.Reset() + topNValue.setMetadata(t.topNSchema.GetFieldName(), t.m.schema.Entity.TagNames) + var shardID uint32 + for _, tuple := range tuples { data := tuple.V2.(flow.StreamRecord).Data().(flow.Data) - err = multierr.Append(err, t.writeData(publisher, eventTime, fieldValue, data, rankNum)) + topNValue.addValue( + tuple.V1.(int64), + data[0].([]*modelv1.TagValue), + ) + shardID = data[3].(uint32) } - } - return err -} - -func (t *topNStreamingProcessor) writeData(publisher queue.BatchPublisher, eventTime time.Time, fieldValue int64, - data flow.Data, rankNum int, -) error { - var tagValues []*modelv1.TagValue - if len(t.topNSchema.GetGroupByTagNames()) > 0 { - var ok bool - if tagValues, ok = data[3].([]*modelv1.TagValue); !ok { - return errors.New("fail to extract tag values from topN result") + entityValues := []*modelv1.TagValue{ + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: t.topNSchema.GetMetadata().GetName(), + }, + }, + }, + { + Value: &modelv1.TagValue_Int{ + Int: &modelv1.Int{ + Value: int64(t.sortDirection), + }, + }, + }, + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: group, + }, + }, + }, } - } - for _, tv := range tagValues { - if tv.Value == nil { - t.l.Warn().Msg("tag value is nil") + t.buf = t.buf[:0] + if t.buf, err = topNValue.marshal(t.buf); err != nil { + return err } - } - series, shardID, err := t.locate(tagValues, rankNum) - if err != nil { - return err - } - - iwr := &measurev1.InternalWriteRequest{ - Request: &measurev1.WriteRequest{ - MessageId: uint64(time.Now().UnixNano()), - Metadata: t.topNSchema.GetMetadata(), - DataPoint: &measurev1.DataPointValue{ - Timestamp: timestamppb.New(eventTime), - TagFamilies: []*modelv1.TagFamilyForWrite{ - { - Tags: append( - data[0].([]*modelv1.TagValue), - // SortDirection - &modelv1.TagValue{ - Value: &modelv1.TagValue_Int{ - Int: &modelv1.Int{ - Value: int64(t.sortDirection), - }, - }, - }, - // RankNumber - &modelv1.TagValue{ - Value: &modelv1.TagValue_Int{ - Int: &modelv1.Int{ - Value: int64(rankNum), - }, - }, - }, - ), + iwr := &measurev1.InternalWriteRequest{ + Request: &measurev1.WriteRequest{ + MessageId: uint64(time.Now().UnixNano()), + Metadata: &commonv1.Metadata{Name: schema.TopNSchemaName, Group: t.topNSchema.GetMetadata().Group}, + DataPoint: &measurev1.DataPointValue{ + Timestamp: timestamppb.New(eventTime), + TagFamilies: []*modelv1.TagFamilyForWrite{ + {Tags: entityValues}, }, - }, - Fields: []*modelv1.FieldValue{ - { - Value: &modelv1.FieldValue_Int{ - Int: &modelv1.Int{ - Value: fieldValue, + Fields: []*modelv1.FieldValue{ + { + Value: &modelv1.FieldValue_BinaryData{ + BinaryData: bytes.Clone(t.buf), }, }, }, }, }, - }, - EntityValues: series.EntityValues, - ShardId: uint32(shardID), + EntityValues: entityValues, + ShardId: shardID, + } + message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "local", iwr) + _, err = publisher.Publish(context.TODO(), apiData.TopicMeasureWrite, message) + if err != nil { + return err + } } - message := bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "local", iwr) - _, errWritePub := publisher.Publish(context.TODO(), apiData.TopicMeasureWrite, message) - return errWritePub + return err } func (t *topNStreamingProcessor) downSampleTimeBucket(eventTimeMillis int64) time.Time { return time.UnixMilli(eventTimeMillis - eventTimeMillis%t.interval.Milliseconds()) } -func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum int) (*pbv1.Series, common.ShardID, error) { - if len(tagValues) != 0 && len(t.topNSchema.GetGroupByTagNames()) != len(tagValues) { - return nil, 0, errors.New("no enough tag values for the entity") - } - series := &pbv1.Series{ - Subject: t.topNSchema.GetMetadata().GetName(), - EntityValues: make([]*modelv1.TagValue, 1+1+len(tagValues)), - } - - copy(series.EntityValues, tagValues) - series.EntityValues[len(series.EntityValues)-2] = &modelv1.TagValue{ - Value: &modelv1.TagValue_Int{ - Int: &modelv1.Int{ - Value: int64(t.sortDirection), - }, - }, - } - series.EntityValues[len(series.EntityValues)-1] = &modelv1.TagValue{ - Value: &modelv1.TagValue_Int{ - Int: &modelv1.Int{ - Value: int64(rankNum), - }, - }, - } - if err := series.Marshal(); err != nil { - return nil, 0, fmt.Errorf("fail to marshal series: %w", err) - } - src := make([]byte, len(series.Buffer)) - copy(src, series.Buffer) - var s1 pbv1.Series - if err := s1.Unmarshal(src); err != nil { - return nil, 0, fmt.Errorf("fail to unmarshal series encoded:[%s] tagValues:[%s]: %w", series.Buffer, series.EntityValues, err) - } - id, err := partition.ShardID(series.Buffer, t.m.shardNum) - if err != nil { - return nil, 0, err - } - return series, common.ShardID(id), nil -} - func (t *topNStreamingProcessor) start() *topNStreamingProcessor { flushInterval := t.interval if flushInterval > maxFlushInterval { @@ -326,7 +291,7 @@ func (manager *topNProcessorManager) Close() error { return err } -func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, request *measurev1.InternalWriteRequest) { +func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, shardID uint32, request *measurev1.InternalWriteRequest) { go func() { manager.RLock() defer manager.RUnlock() @@ -336,6 +301,7 @@ func (manager *topNProcessorManager) onMeasureWrite(seriesID uint64, request *me request.GetRequest().GetDataPoint(), request.GetEntityValues(), seriesID, + shardID, }, request.GetRequest().GetDataPoint().GetTimestamp()) } } @@ -381,6 +347,7 @@ func (manager *topNProcessorManager) start() error { stopCh: make(chan struct{}), streamingFlow: streamingFlow, pipeline: manager.pipeline, + buf: make([]byte, 0, 64), } processorList[i] = processor.start() } @@ -439,8 +406,9 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames "", // field value as v2 dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(), - // groupBy tag values as v3 - nil, + // shardID values as v3 + dpWithEvs.shardID, + // seriesID values as v4 dpWithEvs.seriesID, } }, nil @@ -455,15 +423,14 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames // EntityValues as identity dpWithEvs.entityValues, // save string representation of group values as the key, i.e. v1 - strings.Join(transform(groupLocator, func(locator partition.TagLocator) string { - return stringify(extractTagValue(dpWithEvs.DataPointValue, locator)) - }), "|"), + GroupName(transform(groupLocator, func(locator partition.TagLocator) string { + return Stringify(extractTagValue(dpWithEvs.DataPointValue, locator)) + })), // field value as v2 dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(), - // groupBy tag values as v3 - transform(groupLocator, func(locator partition.TagLocator) *modelv1.TagValue { - return extractTagValue(dpWithEvs.DataPointValue, locator) - }), + // shardID values as v3 + dpWithEvs.shardID, + // seriesID values as v4 dpWithEvs.seriesID, } }, nil @@ -499,7 +466,8 @@ func extractTagValue(dpv *measurev1.DataPointValue, locator partition.TagLocator return tagFamily.GetTags()[locator.TagOffset] } -func stringify(tagValue *modelv1.TagValue) string { +// Stringify converts a TagValue to a string. +func Stringify(tagValue *modelv1.TagValue) string { switch v := tagValue.GetValue().(type) { case *modelv1.TagValue_Str: return v.Str.GetValue() @@ -525,3 +493,208 @@ func transform[I, O any](input []I, mapper func(I) O) []O { } return output } + +// GenerateTopNValue returns a new TopNValue instance. +func GenerateTopNValue() *TopNValue { + v := topNValuePool.Get() + if v == nil { + return &TopNValue{} + } + return v +} + +// ReleaseTopNValue releases a TopNValue instance. +func ReleaseTopNValue(bi *TopNValue) { + bi.Reset() + topNValuePool.Put(bi) +} + +var topNValuePool = pool.Register[*TopNValue]("measure-topNValue") + +// TopNValue represents the topN value. +type TopNValue struct { + valueName string + entityTagNames []string + values []int64 + entities [][]*modelv1.TagValue + entityValues [][]byte + buf []byte + encodeType encoding.EncodeType + firstValue int64 +} + +func (t *TopNValue) setMetadata(valueName string, entityTagNames []string) { + t.valueName = valueName + t.entityTagNames = entityTagNames +} + +func (t *TopNValue) addValue(value int64, entityValues []*modelv1.TagValue) { + t.values = append(t.values, value) + t.entities = append(t.entities, entityValues) +} + +// Values returns the valueName, entityTagNames, values, and entities. +func (t *TopNValue) Values() (string, []string, []int64, [][]*modelv1.TagValue) { + return t.valueName, t.entityTagNames, t.values, t.entities +} + +// Reset resets the TopNValue. +func (t *TopNValue) Reset() { + t.valueName = "" + t.entityTagNames = t.entityTagNames[:0] + t.values = t.values[:0] + for i := range t.entities { + t.entities[i] = t.entities[i][:0] + } + t.entities = t.entities[:0] + t.buf = t.buf[:0] + t.encodeType = encoding.EncodeTypeUnknown + t.firstValue = 0 + for i := range t.entityValues { + t.entityValues[i] = t.entityValues[i][:0] + } + t.entityValues = t.entityValues[:0] +} + +func (t *TopNValue) resizeEntityValues(size int) [][]byte { + entityValues := t.entityValues + if n := size - cap(entityValues); n > 0 { + entityValues = append(entityValues[:cap(entityValues)], make([][]byte, n)...) + } + t.entityValues = entityValues[:size] + return t.entityValues +} + +func (t *TopNValue) resizeEntities(size int, entitySize int) [][]*modelv1.TagValue { + entities := t.entities + if n := size - cap(t.entities); n > 0 { + entities = append(entities[:cap(entities)], make([][]*modelv1.TagValue, n)...) + } + t.entities = entities[:size] + for i := range t.entities { + entity := t.entities[i] + if n := entitySize - cap(entity); n > 0 { + entity = append(entity[:cap(entity)], make([]*modelv1.TagValue, n)...) + } + t.entities[i] = entity[:0] + } + return t.entities +} + +func (t *TopNValue) marshal(dst []byte) ([]byte, error) { + if len(t.values) == 0 { + return nil, errors.New("values is empty") + } + dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName)) + dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames))) + for _, entityTagName := range t.entityTagNames { + dst = encoding.EncodeBytes(dst, convert.StringToBytes(entityTagName)) + } + + dst = encoding.VarUint64ToBytes(dst, uint64(len(t.values))) + + t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf, t.values) + dst = append(dst, byte(t.encodeType)) + dst = encoding.VarInt64ToBytes(dst, t.firstValue) + dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf))) + dst = append(dst, t.buf...) + + var err error + t.entityValues = t.resizeEntityValues(len(t.entities)) + for i, tvv := range t.entities { + t.entityValues[i], err = pbv1.MarshalTagValues(t.entityValues[i], tvv) + if err != nil { + return nil, err + } + } + dst = encoding.EncodeBytesBlock(dst, t.entityValues) + return dst, nil +} + +// Unmarshal unmarshals the TopNValue from the src. +func (t *TopNValue) Unmarshal(src []byte, decoder *encoding.BytesBlockDecoder) error { + var err error + src, nameBytes, err := encoding.DecodeBytes(src) + if err != nil { + return fmt.Errorf("cannot unmarshal topNValue.name: %w", err) + } + t.valueName = convert.BytesToString(nameBytes) + + var entityTagNamesCount uint64 + src, entityTagNamesCount, err = encoding.BytesToVarUint64(src) + if err != nil { + return fmt.Errorf("cannot unmarshal topNValue.entityTagNamesCount: %w", err) + } + t.entityTagNames = make([]string, 0, entityTagNamesCount) + var entityTagNameBytes []byte + for i := uint64(0); i < entityTagNamesCount; i++ { + src, entityTagNameBytes, err = encoding.DecodeBytes(src) + if err != nil { + return fmt.Errorf("cannot unmarshal topNValue.entityTagName: %w", err) + } + t.entityTagNames = append(t.entityTagNames, convert.BytesToString(entityTagNameBytes)) + } + + var valuesCount uint64 + src, valuesCount, err = encoding.BytesToVarUint64(src) + if err != nil { + return fmt.Errorf("cannot unmarshal topNValue.valuesCount: %w", err) + } + + if len(src) < 1 { + return fmt.Errorf("cannot unmarshal topNValue.encodeType: src is too short") + } + t.encodeType = encoding.EncodeType(src[0]) + src = src[1:] + + if len(src) < 1 { + return fmt.Errorf("cannot unmarshal topNValue.firstValue: src is too short") + } + src, t.firstValue, err = encoding.BytesToVarInt64(src) + if err != nil { + return fmt.Errorf("cannot unmarshal topNValue.firstValue: %w", err) + } + if len(src) < 1 { + return fmt.Errorf("cannot unmarshal topNValue.valueLen: src is too short") + } + var valueLen uint64 + src, valueLen, err = encoding.BytesToVarUint64(src) + if err != nil { + return fmt.Errorf("cannot unmarshal topNValue.valueLen: %w", err) + } + + if uint64(len(src)) < valueLen { + return fmt.Errorf("src is too short for reading string with size %d; len(src)=%d", valueLen, len(src)) + } + + t.values, err = encoding.BytesToInt64List(t.values, src[:valueLen], t.encodeType, t.firstValue, int(valuesCount)) + if err != nil { + return fmt.Errorf("cannot unmarshal topNValue.values: %w", err) + } + + if uint64(len(src)) < valueLen { + return fmt.Errorf("src is too short for reading string with size %d; len(src)=%d", valueLen, len(src)) + } + + decoder.Reset() + t.entityValues, err = decoder.Decode(t.entityValues, src[valueLen:], valuesCount) + if err != nil { + return fmt.Errorf("cannot unmarshal topNValue.entityValues: %w", err) + } + t.resizeEntities(len(t.entityValues), int(entityTagNamesCount)) + for i, ev := range t.entityValues { + t.buf, t.entities[i], err = pbv1.UnmarshalTagValues(t.buf, t.entities[i], ev) + if err != nil { + return fmt.Errorf("cannot unmarshal topNValue.entityValues[%d]: %w", i, err) + } + if len(t.entities[i]) != len(t.entityTagNames) { + return fmt.Errorf("entityValues[%d] length is not equal to entityTagNames", i) + } + } + return nil +} + +// GroupName returns the group name. +func GroupName(groupTags []string) string { + return strings.Join(groupTags, "|") +} diff --git a/banyand/measure/topn_test.go b/banyand/measure/topn_test.go new file mode 100644 index 00000000..b416ec6b --- /dev/null +++ b/banyand/measure/topn_test.go @@ -0,0 +1,91 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "testing" + + "github.com/stretchr/testify/require" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" +) + +func TestTopNValue_MarshalUnmarshal(t *testing.T) { + tests := []struct { + topNVal *TopNValue + name string + }{ + { + name: "simple case", + topNVal: &TopNValue{ + valueName: "testValue", + entityTagNames: []string{"tag1", "tag2"}, + values: []int64{1, 2, 3}, + entities: [][]*modelv1.TagValue{ + { + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc1"}}}, + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "entity1"}}}, + }, + { + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc1"}}}, + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "entity2"}}}, + }, + { + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc1"}}}, + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "entity3"}}}, + }, + }, + }, + }, + { + name: "single", + topNVal: &TopNValue{ + valueName: "testValue", + entityTagNames: []string{"tag1", "tag2"}, + values: []int64{1}, + entities: [][]*modelv1.TagValue{ + { + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc1"}}}, + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "entity1"}}}, + }, + }, + }, + }, + } + decoder := generateColumnValuesDecoder() + defer releaseColumnValuesDecoder(decoder) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Marshal the topNValue + dst, err := tt.topNVal.marshal(nil) + require.NoError(t, err) + + // Unmarshal the topNValue + var decodedTopNValue TopNValue + err = decodedTopNValue.Unmarshal(dst, decoder) + require.NoError(t, err) + + // Compare the original and decoded topNValue + require.Equal(t, tt.topNVal.valueName, decodedTopNValue.valueName) + require.Equal(t, tt.topNVal.entityTagNames, decodedTopNValue.entityTagNames) + require.Equal(t, tt.topNVal.values, decodedTopNValue.values) + require.Equal(t, tt.topNVal.entities, decodedTopNValue.entities) + }) + } +} diff --git a/banyand/measure/write.go b/banyand/measure/write.go index b1e9810b..859cf09a 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -145,7 +145,7 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me dpt.dataPoints.fields = append(dpt.dataPoints.fields, field) if stm.processorManager != nil { - stm.processorManager.onMeasureWrite(uint64(series.ID), &measurev1.InternalWriteRequest{ + stm.processorManager.onMeasureWrite(uint64(series.ID), uint32(shardID), &measurev1.InternalWriteRequest{ Request: &measurev1.WriteRequest{ Metadata: stm.GetSchema().Metadata, DataPoint: req.DataPoint, diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go index e042df76..234230f2 100644 --- a/banyand/query/processor_topn.go +++ b/banyand/query/processor_topn.go @@ -40,6 +40,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/aggregation" "github.com/apache/skywalking-banyandb/pkg/query/executor" logical_measure "github.com/apache/skywalking-banyandb/pkg/query/logical/measure" + pkgschema "github.com/apache/skywalking-banyandb/pkg/schema" ) type topNQueryProcessor struct { @@ -94,21 +95,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp return } - topNResultMeasure, err := t.measureService.Measure(topNMetadata) - if err != nil { - t.log.Error().Err(err). - Str("topN", topNMetadata.GetName()). - Msg("fail to find topN result measure") - return - } - - s, err := logical_measure.BuildTopNSchema(topNResultMeasure.GetSchema()) - if err != nil { - t.log.Error().Err(err). - Str("topN", topNMetadata.GetName()). - Msg("fail to build schema") - } - plan, err := logical_measure.TopNAnalyze(ctx, request, topNResultMeasure.GetSchema(), sourceMeasure.GetSchema(), s) + plan, err := logical_measure.TopNAnalyze(request, sourceMeasure.GetSchema(), topNSchema) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for topn %s: %v", topNMetadata.GetName(), err)) return @@ -117,6 +104,11 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp if e := ml.Debug(); e.Enabled() { e.Str("plan", plan.String()).Msg("topn plan") } + topNResultMeasure, err := t.measureService.Measure(pkgschema.GetTopNSchemaMetadata(topNMetadata.Group)) + if err != nil { + ml.Error().Err(err).Str("topN", topNMetadata.GetName()).Msg("fail to find topn result measure") + return + } var tracer *query.Tracer var span *query.Span if request.Trace { diff --git a/pkg/partition/index.go b/pkg/partition/index.go index bbb4daf0..f3c69833 100644 --- a/pkg/partition/index.go +++ b/pkg/partition/index.go @@ -49,6 +49,9 @@ func ParseIndexRuleLocators(entity *databasev1.Entity, families []*databasev1.Ta } findIndexRuleByTagName := func(tagName string) *databasev1.IndexRule { for i := range indexRules { + if indexRules[i] == nil { + continue + } for j := range indexRules[i].Tags { if indexRules[i].Tags[j] == tagName { return indexRules[i] diff --git a/pkg/pb/v1/series.go b/pkg/pb/v1/series.go index 542343d9..31a6f289 100644 --- a/pkg/pb/v1/series.go +++ b/pkg/pb/v1/series.go @@ -48,10 +48,9 @@ func (s *Series) CopyTo(dst *Series) { func (s *Series) Marshal() error { s.Buffer = marshalEntityValue(s.Buffer, convert.StringToBytes(s.Subject)) var err error - for _, tv := range s.EntityValues { - if s.Buffer, err = marshalTagValue(s.Buffer, tv); err != nil { - return errors.WithMessage(err, "marshal subject and entity values") - } + s.Buffer, err = MarshalTagValues(s.Buffer, s.EntityValues) + if err != nil { + return errors.WithMessage(err, "marshal subject and entity values") } s.ID = common.SeriesID(convert.Hash(s.Buffer)) return nil diff --git a/pkg/pb/v1/value.go b/pkg/pb/v1/value.go index 1e73e6c7..b9c6fa80 100644 --- a/pkg/pb/v1/value.go +++ b/pkg/pb/v1/value.go @@ -97,6 +97,33 @@ func MustTagValueToStr(tag *modelv1.TagValue) string { } } +// MarshalTagValues marshals tag values. +func MarshalTagValues(dest []byte, tags []*modelv1.TagValue) ([]byte, error) { + var err error + for _, tag := range tags { + dest, err = marshalTagValue(dest, tag) + if err != nil { + return nil, err + } + } + return dest, nil +} + +// UnmarshalTagValues unmarshals tag values. +func UnmarshalTagValues(dest []byte, destTags []*modelv1.TagValue, src []byte) ([]byte, []*modelv1.TagValue, error) { + var err error + var tag *modelv1.TagValue + for len(src) > 0 { + dest = dest[:0] + dest, src, tag, err = unmarshalTagValue(dest, src) + if err != nil { + return nil, nil, err + } + destTags = append(destTags, tag) + } + return dest, destTags, nil +} + func marshalTagValue(dest []byte, tv *modelv1.TagValue) ([]byte, error) { dest = append(dest, byte(MustTagValueToValueType(tv))) switch tv.Value.(type) { @@ -136,6 +163,9 @@ func unmarshalTagValue(dest []byte, src []byte) ([]byte, []byte, *modelv1.TagVal if dest, src, err = unmarshalEntityValue(dest, src[1:]); err != nil { return nil, nil, nil, errors.WithMessage(err, "unmarshal string tag value") } + if len(dest) == 0 { + return dest, src, NullTagValue, nil + } return dest, src, &modelv1.TagValue{ Value: &modelv1.TagValue_Str{ Str: &modelv1.Str{ @@ -158,6 +188,9 @@ func unmarshalTagValue(dest []byte, src []byte) ([]byte, []byte, *modelv1.TagVal if dest, src, err = unmarshalEntityValue(dest, src[1:]); err != nil { return nil, nil, nil, errors.WithMessage(err, "unmarshal binary tag value") } + if len(dest) == 0 { + return dest, src, NullTagValue, nil + } data := make([]byte, len(dest)) copy(data, dest) return dest, src, &modelv1.TagValue{ diff --git a/pkg/query/logical/measure/topn_analyzer.go b/pkg/query/logical/measure/topn_analyzer.go index 4d72787f..eed24da5 100644 --- a/pkg/query/logical/measure/topn_analyzer.go +++ b/pkg/query/logical/measure/topn_analyzer.go @@ -19,69 +19,49 @@ package measure import ( - "context" - "fmt" + "errors" - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/query/logical" + pkgschema "github.com/apache/skywalking-banyandb/pkg/schema" ) -// BuildTopNSchema returns Schema loaded from the metadata repository. -func BuildTopNSchema(md *databasev1.Measure) (logical.Schema, error) { - md.GetEntity() - - ms := &schema{ - common: &logical.CommonSchema{ - TagSpecMap: make(map[string]*logical.TagSpec), - EntityList: md.GetEntity().GetTagNames(), - }, - measure: md, - fieldMap: make(map[string]*logical.FieldSpec), - } - - ms.common.RegisterTagFamilies(md.GetTagFamilies()) - - for fieldIdx, spec := range md.GetFields() { - ms.registerField(fieldIdx, spec) - } - - return ms, nil -} - // TopNAnalyze converts logical expressions to executable operation tree represented by Plan. -func TopNAnalyze(_ context.Context, criteria *measurev1.TopNRequest, schema *databasev1.Measure, - sourceMeasureSchema *databasev1.Measure, s logical.Schema, +func TopNAnalyze(criteria *measurev1.TopNRequest, sourceMeasureSchema *databasev1.Measure, topNAggSchema *databasev1.TopNAggregation, ) (logical.Plan, error) { - groupByProjectionTags := sourceMeasureSchema.GetEntity().GetTagNames() - groupByTags := make([][]*logical.Tag, len(schema.GetTagFamilies())) - tagFamily := schema.GetTagFamilies()[0] - groupByTags[0] = logical.NewTags(tagFamily.GetName(), groupByProjectionTags...) - - if len(schema.GetFields()) != 1 { - return nil, fmt.Errorf("topN schema fields count should be 1 but got %d", len(schema.GetFields())) - } - projectionFields := make([]*logical.Field, 1) - fieldName := schema.GetFields()[0].GetName() - projectionFields[0] = logical.NewField(fieldName) // parse fields - plan := parse(criteria, schema.GetMetadata(), projectionFields, groupByTags) + timeRange := criteria.GetTimeRange() + var plan logical.UnresolvedPlan + plan = &unresolvedLocalScan{ + name: criteria.Name, + startTime: timeRange.GetBegin().AsTime(), + endTime: timeRange.GetEnd().AsTime(), + conditions: criteria.Conditions, + sort: criteria.FieldValueSort, + groupByTags: topNAggSchema.GroupByTagNames, + } + s, err := buildVirtualSchema(sourceMeasureSchema, topNAggSchema.FieldName) + if err != nil { + return nil, err + } if criteria.GetAgg() != 0 { + groupByProjectionTags := sourceMeasureSchema.GetEntity().GetTagNames() + groupByTags := [][]*logical.Tag{logical.NewTags(pkgschema.TopNTagFamily, groupByProjectionTags...)} plan = newUnresolvedGroupBy(plan, groupByTags, false) plan = newUnresolvedAggregation(plan, - projectionFields[0], + &logical.Field{Name: topNAggSchema.FieldName}, criteria.GetAgg(), true) } plan = top(plan, &measurev1.QueryRequest_Top{ Number: criteria.GetTopN(), - FieldName: fieldName, + FieldName: topNAggSchema.FieldName, FieldValueSort: criteria.GetFieldValueSort(), }) + p, err := plan.Analyze(s) if err != nil { return nil, err @@ -89,26 +69,54 @@ func TopNAnalyze(_ context.Context, criteria *measurev1.TopNRequest, schema *dat return p, nil } -func parse(criteria *measurev1.TopNRequest, metadata *commonv1.Metadata, - projFields []*logical.Field, projTags [][]*logical.Tag, -) logical.UnresolvedPlan { - timeRange := criteria.GetTimeRange() - return local(timeRange.GetBegin().AsTime(), timeRange.GetEnd().AsTime(), - metadata, projTags, projFields, buildConditions(criteria), criteria.GetFieldValueSort()) -} - -func buildConditions(criteria *measurev1.TopNRequest) []*modelv1.Condition { - return append([]*modelv1.Condition{ - { - Name: "sortDirection", - Op: modelv1.Condition_BINARY_OP_EQ, - Value: &modelv1.TagValue{ - Value: &modelv1.TagValue_Int{ - Int: &modelv1.Int{ - Value: int64(criteria.GetFieldValueSort().Number()), - }, - }, +func buildVirtualSchema(sourceMeasureSchema *databasev1.Measure, fieldName string) (logical.Schema, error) { + var tags []*databasev1.TagSpec + for _, tag := range sourceMeasureSchema.GetEntity().TagNames { + for i := range sourceMeasureSchema.GetTagFamilies() { + for j := range sourceMeasureSchema.GetTagFamilies()[i].Tags { + if sourceMeasureSchema.GetTagFamilies()[i].Tags[j].Name == tag { + tags = append(tags, sourceMeasureSchema.GetTagFamilies()[i].Tags[j]) + if len(tags) == len(sourceMeasureSchema.GetEntity().TagNames) { + break + } + } + } + } + } + if len(tags) != len(sourceMeasureSchema.GetEntity().TagNames) { + return nil, errors.New("failed to build topn schema, source measure schema is invalid:" + sourceMeasureSchema.String()) + } + var fields []*databasev1.FieldSpec + for _, field := range sourceMeasureSchema.GetFields() { + if field.GetName() == fieldName { + fields = append(fields, field) + break + } + } + md := &databasev1.Measure{ + Metadata: sourceMeasureSchema.Metadata, + TagFamilies: []*databasev1.TagFamilySpec{ + { + Name: pkgschema.TopNTagFamily, + Tags: tags, }, }, - }, criteria.GetConditions()...) + Fields: fields, + Entity: &databasev1.Entity{ + TagNames: sourceMeasureSchema.Entity.TagNames, + }, + } + ms := &schema{ + common: &logical.CommonSchema{ + TagSpecMap: make(map[string]*logical.TagSpec), + EntityList: md.GetEntity().GetTagNames(), + }, + measure: md, + fieldMap: make(map[string]*logical.FieldSpec), + } + ms.common.RegisterTagFamilies(md.GetTagFamilies()) + for fieldIdx, spec := range md.GetFields() { + ms.registerField(fieldIdx, spec) + } + return ms, nil } diff --git a/pkg/query/logical/measure/topn_plan_localscan.go b/pkg/query/logical/measure/topn_plan_localscan.go index 9608af95..ee319af6 100644 --- a/pkg/query/logical/measure/topn_plan_localscan.go +++ b/pkg/query/logical/measure/topn_plan_localscan.go @@ -24,95 +24,105 @@ import ( "time" "github.com/pkg/errors" + "google.golang.org/protobuf/types/known/timestamppb" - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/banyand/measure" + "github.com/apache/skywalking-banyandb/pkg/encoding" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" "github.com/apache/skywalking-banyandb/pkg/query/model" + pkgschema "github.com/apache/skywalking-banyandb/pkg/schema" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) -var _ logical.UnresolvedPlan = (*unresolvedLocalScan)(nil) +var ( + _ logical.UnresolvedPlan = (*unresolvedLocalScan)(nil) + fieldProjection = []string{pkgschema.TopNFieldName} +) type unresolvedLocalScan struct { - startTime time.Time - endTime time.Time - metadata *commonv1.Metadata - conditions []*modelv1.Condition - projectionTags [][]*logical.Tag - projectionFields []*logical.Field - sort modelv1.Sort + startTime time.Time + endTime time.Time + name string + conditions []*modelv1.Condition + groupByTags []string + sort modelv1.Sort } func (uls *unresolvedLocalScan) Analyze(s logical.Schema) (logical.Plan, error) { - var projTagsRefs [][]*logical.TagRef - projTags := make([]model.TagProjection, len(uls.projectionTags)) - if len(uls.projectionTags) > 0 { - for i := range uls.projectionTags { - for _, tag := range uls.projectionTags[i] { - projTags[i].Family = tag.GetFamilyName() - projTags[i].Names = append(projTags[i].Names, tag.GetTagName()) - } - } - var err error - projTagsRefs, err = s.CreateTagRef(uls.projectionTags...) - if err != nil { - return nil, err - } + tr := timestamp.NewInclusiveTimeRange(uls.startTime, uls.endTime) + groupByTags, err := uls.parseGroupByTags() + if err != nil { + return nil, errors.Wrap(err, "failed to locate entity") } - - var projFieldRefs []*logical.FieldRef - var projField []string - if len(uls.projectionFields) > 0 { - for i := range uls.projectionFields { - projField = append(projField, uls.projectionFields[i].Name) - } - var err error - projFieldRefs, err = s.CreateFieldRef(uls.projectionFields...) - if err != nil { - return nil, err - } + entities := [][]*modelv1.TagValue{ + { + { + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: uls.name, + }, + }, + }, + { + Value: &modelv1.TagValue_Int{ + Int: &modelv1.Int{ + Value: int64(uls.sort), + }, + }, + }, + pbv1.AnyTagValue, + }, } - - entity, err := uls.locateEntity(s.EntityList()) - if err != nil { - return nil, err + if len(groupByTags) > 0 { + entities[0][len(entities[0])-1] = &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: measure.GroupName(groupByTags), + }, + }, + } } - return &localScan{ - timeRange: timestamp.NewInclusiveTimeRange(uls.startTime, uls.endTime), - schema: s, - projectionTagsRefs: projTagsRefs, - projectionFieldsRefs: projFieldRefs, - projectionTags: projTags, - projectionFields: projField, - metadata: uls.metadata, - entity: entity, - l: logger.GetLogger("topn", "measure", uls.metadata.Group, uls.metadata.Name, "local-index"), + s: s, + options: model.MeasureQueryOptions{ + Name: pkgschema.TopNSchemaName, + TimeRange: &tr, + Entities: entities, + TagProjection: []model.TagProjection{ + { + Family: pkgschema.TopNTagFamily, + Names: pkgschema.TopNTagNames, + }, + }, + FieldProjection: fieldProjection, + }, }, nil } -func (uls *unresolvedLocalScan) locateEntity(entityList []string) ([]*modelv1.TagValue, error) { +func (uls *unresolvedLocalScan) parseGroupByTags() ([]string, error) { + if len(uls.conditions) == 0 { + return nil, nil + } entityMap := make(map[string]int) - entity := make([]*modelv1.TagValue, len(entityList)) - for idx, tagName := range entityList { + entity := make([]string, len(uls.groupByTags)) + for idx, tagName := range uls.groupByTags { entityMap[tagName] = idx - // allow to make fuzzy search with partial conditions - entity[idx] = pbv1.AnyTagValue } + parsed := 0 for _, pairQuery := range uls.conditions { if pairQuery.GetOp() != modelv1.Condition_BINARY_OP_EQ { return nil, errors.Errorf("tag belongs to the entity only supports EQ operation in condition(%v)", pairQuery) } if entityIdx, ok := entityMap[pairQuery.GetName()]; ok { - entity[entityIdx] = pairQuery.Value switch pairQuery.GetValue().GetValue().(type) { case *modelv1.TagValue_Str, *modelv1.TagValue_Int, *modelv1.TagValue_Null: - entity[entityIdx] = pairQuery.Value + entity[entityIdx] = measure.Stringify(pairQuery.Value) + parsed++ default: return nil, errors.New("unsupported condition tag type for entity") } @@ -120,61 +130,37 @@ func (uls *unresolvedLocalScan) locateEntity(entityList []string) ([]*modelv1.Ta } return nil, errors.New("only groupBy tag name is supported") } + if parsed != len(uls.groupByTags) { + return nil, errors.New("failed to parse all groupBy tags") + } return entity, nil } -func local(startTime, endTime time.Time, metadata *commonv1.Metadata, projectionTags [][]*logical.Tag, - projectionFields []*logical.Field, conditions []*modelv1.Condition, sort modelv1.Sort, -) logical.UnresolvedPlan { - return &unresolvedLocalScan{ - startTime: startTime, - endTime: endTime, - metadata: metadata, - projectionTags: projectionTags, - projectionFields: projectionFields, - conditions: conditions, - sort: sort, - } -} - var _ logical.Plan = (*localScan)(nil) type localScan struct { - schema logical.Schema - metadata *commonv1.Metadata - l *logger.Logger - timeRange timestamp.TimeRange - projectionTagsRefs [][]*logical.TagRef - projectionFieldsRefs []*logical.FieldRef - projectionTags []model.TagProjection - projectionFields []string - entity []*modelv1.TagValue - sort modelv1.Sort + s logical.Schema + options model.MeasureQueryOptions } func (i *localScan) Execute(ctx context.Context) (mit executor.MIterator, err error) { ec := executor.FromMeasureExecutionContext(ctx) - result, err := ec.Query(ctx, model.MeasureQueryOptions{ - Name: i.metadata.GetName(), - TimeRange: &i.timeRange, - Entities: [][]*modelv1.TagValue{i.entity}, - Order: &index.OrderBy{Sort: i.sort}, - TagProjection: i.projectionTags, - FieldProjection: i.projectionFields, - }) + result, err := ec.Query(ctx, i.options) if err != nil { return nil, fmt.Errorf("failed to query measure: %w", err) } - return &resultMIterator{ - result: result, + return &topNMIterator{ + result: result, + topNValue: measure.GenerateTopNValue(), + decoder: generateTopNValuesDecoder(), }, nil } func (i *localScan) String() string { - return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s}; projection=%s; sort=%s;", - i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(), - logical.FormatTagRefs(", ", i.projectionTagsRefs...), i.sort) + return fmt.Sprintf("TopNAggScan: %s startTime=%d,endTime=%d, entity=%s;", + i.options.Name, i.options.TimeRange.Start.Unix(), i.options.TimeRange.End.Unix(), + i.options.Entities) } func (i *localScan) Children() []logical.Plan { @@ -182,8 +168,101 @@ func (i *localScan) Children() []logical.Plan { } func (i *localScan) Schema() logical.Schema { - if len(i.projectionTagsRefs) == 0 { - return i.schema + return i.s +} + +type topNMIterator struct { + result model.MeasureQueryResult + err error + topNValue *measure.TopNValue + decoder *encoding.BytesBlockDecoder + current []*measurev1.DataPoint +} + +func (ei *topNMIterator) Next() bool { + if ei.result == nil { + return false } - return i.schema.ProjTags(i.projectionTagsRefs...).ProjFields(i.projectionFieldsRefs...) + + r := ei.result.Pull() + if r == nil { + return false + } + if r.Error != nil { + ei.err = r.Error + return false + } + ei.current = ei.current[:0] + + for i := range r.Timestamps { + fv := r.Fields[0].Values[i] + bd := fv.GetBinaryData() + if bd == nil { + ei.err = errors.New("failed to get binary data") + return false + } + ei.topNValue.Reset() + ei.err = ei.topNValue.Unmarshal(bd, ei.decoder) + if ei.err != nil { + return false + } + fieldName, entityNames, values, entities := ei.topNValue.Values() + for j := range entities { + dp := &measurev1.DataPoint{ + Timestamp: timestamppb.New(time.Unix(0, r.Timestamps[i])), + Sid: uint64(r.SID), + Version: r.Versions[i], + } + tagFamily := &modelv1.TagFamily{ + Name: pkgschema.TopNTagFamily, + } + dp.TagFamilies = append(dp.TagFamilies, tagFamily) + for k, entityName := range entityNames { + tagFamily.Tags = append(tagFamily.Tags, &modelv1.Tag{ + Key: entityName, + Value: entities[j][k], + }) + } + dp.Fields = append(dp.Fields, &measurev1.DataPoint_Field{ + Name: fieldName, + Value: &modelv1.FieldValue{ + Value: &modelv1.FieldValue_Int{ + Int: &modelv1.Int{ + Value: values[j], + }, + }, + }, + }) + ei.current = append(ei.current, dp) + } + } + return true } + +func (ei *topNMIterator) Current() []*measurev1.DataPoint { + return ei.current +} + +func (ei *topNMIterator) Close() error { + if ei.result != nil { + ei.result.Release() + } + releaseTopNValuesDecoder(ei.decoder) + measure.ReleaseTopNValue(ei.topNValue) + return ei.err +} + +func generateTopNValuesDecoder() *encoding.BytesBlockDecoder { + v := topNValuesDecoderPool.Get() + if v == nil { + return &encoding.BytesBlockDecoder{} + } + return v +} + +func releaseTopNValuesDecoder(d *encoding.BytesBlockDecoder) { + d.Reset() + topNValuesDecoderPool.Put(d) +} + +var topNValuesDecoderPool = pool.Register[*encoding.BytesBlockDecoder]("topn-valueDecoder") diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go index 9abe21ec..7fcd31f4 100644 --- a/pkg/schema/cache.go +++ b/pkg/schema/cache.go @@ -203,7 +203,7 @@ func (sr *schemaRepo) Watcher() { err = sr.initResource(evt.Metadata.GetMetadata()) case EventKindTopNAgg: topNSchema := evt.Metadata.(*databasev1.TopNAggregation) - _, err = createOrUpdateTopNMeasure(context.Background(), sr.metadata.MeasureRegistry(), topNSchema) + err = createTopNResultMeasure(context.Background(), sr.metadata.MeasureRegistry(), topNSchema.GetMetadata().Group) if err != nil { break } @@ -217,10 +217,7 @@ func (sr *schemaRepo) Watcher() { err = sr.deleteResource(evt.Metadata.GetMetadata()) case EventKindTopNAgg: topNSchema := evt.Metadata.(*databasev1.TopNAggregation) - err = multierr.Combine( - sr.deleteResource(topNSchema.SourceMeasure), - sr.initResource(topNSchema.SourceMeasure), - ) + err = sr.initResource(topNSchema.SourceMeasure) } } if err != nil && !errors.Is(err, schema.ErrClosed) { diff --git a/pkg/schema/init.go b/pkg/schema/init.go index a158daab..5050e4bc 100644 --- a/pkg/schema/init.go +++ b/pkg/schema/init.go @@ -22,9 +22,7 @@ import ( "fmt" "time" - "github.com/google/go-cmp/cmp" "github.com/pkg/errors" - "google.golang.org/protobuf/testing/protocmp" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" @@ -32,16 +30,23 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" ) -const topNTagFamily = "__topN__" +const ( + // TopNTagFamily is the tag family name of the topN result measure. + TopNTagFamily = "_topN" + // TopNFieldName is the field name of the topN result measure. + TopNFieldName = "value" +) var ( - initTimeout = 10 * time.Second - topNValueFieldSpec = &databasev1.FieldSpec{ - Name: "value", - FieldType: databasev1.FieldType_FIELD_TYPE_INT, + initTimeout = 10 * time.Second + topNFieldsSpec = []*databasev1.FieldSpec{{ + Name: TopNFieldName, + FieldType: databasev1.FieldType_FIELD_TYPE_DATA_BINARY, EncodingMethod: databasev1.EncodingMethod_ENCODING_METHOD_GORILLA, CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, - } + }} + // TopNTagNames is the tag names of the topN result measure. + TopNTagNames = []string{"name", "direction", "group"} ) type revisionContext struct { @@ -154,6 +159,12 @@ func (sr *schemaRepo) getRules(ctx context.Context, gName string) map[string]*da func (sr *schemaRepo) processMeasure(ctx context.Context, gName string, group *group, bindings map[string][]string, rules map[string]*databasev1.IndexRule) { aggMap := sr.getAggMap(ctx, gName) + if len(aggMap) > 0 { + if err := createTopNResultMeasure(ctx, sr.metadata.MeasureRegistry(), gName); err != nil { + logger.Panicf("fails to create the topN result measure: %v", err) + return + } + } ctx, cancel := context.WithTimeout(ctx, initTimeout) defer cancel() @@ -163,30 +174,8 @@ func (sr *schemaRepo) processMeasure(ctx context.Context, gName string, group *g logger.Panicf("fails to get the measures: %v", err) return } - revCtx := ctx.Value(revCtxKey).(*revisionContext) - measureMap := make(map[string]*databasev1.Measure, len(mm)) - for _, m := range mm { - measureMap[m.Metadata.Name] = m - } - for _, aa := range aggMap { - for _, a := range aa { - sourceMeasre, ok := measureMap[a.SourceMeasure.Name] - if !ok { - sr.l.Warn().Str("group", gName).Str("measure", a.SourceMeasure.Name).Str("agg", a.Metadata.Name).Msg("source measure not found") - continue - } - if _, ok := measureMap[a.Metadata.Name]; !ok { - sr.l.Info().Str("group", gName).Str("measure", a.SourceMeasure.Name).Str("agg", a.Metadata.Name).Msg("remove topN aggregation") - m, err := createTopNMeasure(ctx, sr.metadata.MeasureRegistry(), a, sourceMeasre) - if err != nil { - logger.Panicf("fails to create or update the topN measure: %v", err) - return - } - mm = append(mm, m) - } - } - } + revCtx := ctx.Value(revCtxKey).(*revisionContext) for _, m := range mm { if m.Metadata.ModRevision > revCtx.measure { revCtx.measure = m.Metadata.ModRevision @@ -228,7 +217,9 @@ func (sr *schemaRepo) storeMeasure(m *databasev1.Measure, group *group, bindings var indexRules []*databasev1.IndexRule if rr, ok := bindings[m.Metadata.GetName()]; ok { for _, r := range rr { - indexRules = append(indexRules, rules[r]) + if rule, ok := rules[r]; ok { + indexRules = append(indexRules, rule) + } } } var topNAggr []*databasev1.TopNAggregation @@ -263,7 +254,9 @@ func (sr *schemaRepo) storeStream(s *databasev1.Stream, group *group, bindings m var indexRules []*databasev1.IndexRule if rr, ok := bindings[s.Metadata.GetName()]; ok { for _, r := range rr { - indexRules = append(indexRules, rules[r]) + if rule, ok := rules[r]; ok { + indexRules = append(indexRules, rule) + } } } if err := sr.storeResource(group, s, indexRules, nil); err != nil { @@ -284,107 +277,50 @@ func (sr *schemaRepo) initGroup(groupSchema *commonv1.Group) (*group, error) { return g, nil } -func createOrUpdateTopNMeasure(ctx context.Context, measureSchemaRegistry schema.Measure, topNSchema *databasev1.TopNAggregation) (*databasev1.Measure, error) { - oldTopNSchema, err := measureSchemaRegistry.GetMeasure(ctx, topNSchema.GetMetadata()) +func createTopNResultMeasure(ctx context.Context, measureSchemaRegistry schema.Measure, group string) error { + md := GetTopNSchemaMetadata(group) + m, err := measureSchemaRegistry.GetMeasure(ctx, md) if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) { - return nil, errors.WithMessagef(err, "fail to get current topN measure %s", topNSchema.GetMetadata().GetName()) + return errors.WithMessagef(err, "fail to get %s", md) } - - sourceMeasureSchema, err := measureSchemaRegistry.GetMeasure(ctx, topNSchema.GetSourceMeasure()) - if err != nil { - return nil, errors.WithMessagef(err, "fail to get source measure %s", topNSchema.GetSourceMeasure().GetName()) + if m != nil { + return nil } - // create a new "derived" measure for TopN result - newTopNMeasure, err := buildTopNSourceMeasure(topNSchema, sourceMeasureSchema) - if err != nil { - return nil, err - } - if oldTopNSchema == nil { - if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, newTopNMeasure); innerErr != nil { - if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) { - return nil, errors.WithMessagef(innerErr, "fail to create new topN measure %s", newTopNMeasure.GetMetadata().GetName()) - } - newTopNMeasure, err = measureSchemaRegistry.GetMeasure(ctx, topNSchema.GetMetadata()) - if err != nil { - return nil, errors.WithMessagef(err, "fail to get created topN measure %s", topNSchema.GetMetadata().GetName()) - } + m = GetTopNSchema(md) + if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, m); innerErr != nil { + if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) { + return errors.WithMessagef(innerErr, "fail to create new topN measure %s", m) } - return newTopNMeasure, nil - } - // compare with the old one - if cmp.Diff(newTopNMeasure, oldTopNSchema, - protocmp.IgnoreUnknown(), - protocmp.IgnoreFields(&databasev1.Measure{}, "updated_at"), - protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"), - protocmp.Transform()) == "" { - return oldTopNSchema, nil - } - // update - if _, err = measureSchemaRegistry.UpdateMeasure(ctx, newTopNMeasure); err != nil { - return nil, errors.WithMessagef(err, "fail to update topN measure %s", newTopNMeasure.GetMetadata().GetName()) - } - return newTopNMeasure, nil -} - -func createTopNMeasure(ctx context.Context, measureSchemaRegistry schema.Measure, topNSchema *databasev1.TopNAggregation, - sourceMeasureSchema *databasev1.Measure, -) (*databasev1.Measure, error) { - newTopNMeasure, err := buildTopNSourceMeasure(topNSchema, sourceMeasureSchema) - if err != nil { - return nil, err - } - if _, err := measureSchemaRegistry.CreateMeasure(ctx, newTopNMeasure); err != nil { - return nil, err } - return newTopNMeasure, nil + return nil } -func buildTopNSourceMeasure(topNSchema *databasev1.TopNAggregation, sourceMeasureSchema *databasev1.Measure) (*databasev1.Measure, error) { - tagNames := sourceMeasureSchema.GetEntity().GetTagNames() - seriesSpecs := make([]*databasev1.TagSpec, 0, len(tagNames)) - - for _, tagName := range tagNames { - var found bool - for _, fSpec := range sourceMeasureSchema.GetTagFamilies() { - for _, tSpec := range fSpec.GetTags() { - if tSpec.GetName() == tagName { - seriesSpecs = append(seriesSpecs, tSpec) - found = true - goto CHECK - } - } - } - - CHECK: - if !found { - return nil, fmt.Errorf("fail to find tag spec %s", tagName) - } - } - // create a new "derived" measure for TopN result +// GetTopNSchema returns the schema of the topN result measure. +func GetTopNSchema(md *commonv1.Metadata) *databasev1.Measure { return &databasev1.Measure{ - Metadata: topNSchema.Metadata, - Interval: sourceMeasureSchema.GetInterval(), + Metadata: md, TagFamilies: []*databasev1.TagFamilySpec{ { - Name: topNTagFamily, - Tags: append(seriesSpecs, - &databasev1.TagSpec{ - Name: "sortDirection", - Type: databasev1.TagType_TAG_TYPE_INT, - }, - &databasev1.TagSpec{ - Name: "rankNumber", - Type: databasev1.TagType_TAG_TYPE_INT, - }, - ), + Name: TopNTagFamily, + Tags: []*databasev1.TagSpec{ + {Name: TopNTagNames[0], Type: databasev1.TagType_TAG_TYPE_STRING}, + {Name: TopNTagNames[1], Type: databasev1.TagType_TAG_TYPE_INT}, + {Name: TopNTagNames[2], Type: databasev1.TagType_TAG_TYPE_STRING}, + }, }, }, - Fields: []*databasev1.FieldSpec{topNValueFieldSpec}, + Fields: topNFieldsSpec, Entity: &databasev1.Entity{ - TagNames: append(topNSchema.GetGroupByTagNames(), - "sortDirection", - "rankNumber"), + TagNames: TopNTagNames, }, - }, nil + } +} + +// GetTopNSchemaMetadata returns the metadata of the topN result measure. +func GetTopNSchemaMetadata(group string) *commonv1.Metadata { + return &commonv1.Metadata{ + Name: TopNSchemaName, + Group: group, + } } diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go index 2ac35232..6ee3c2d0 100644 --- a/pkg/schema/schema.go +++ b/pkg/schema/schema.go @@ -26,6 +26,9 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/schema" ) +// TopNSchemaName is the name of the top n result schema. +const TopNSchemaName = "_top_n_result" + // EventType defines actions of events. type EventType uint8 diff --git a/test/cases/topn/data/want/aggr_desc.yaml b/test/cases/topn/data/want/aggr_desc.yaml index b39f77e7..27895621 100644 --- a/test/cases/topn/data/want/aggr_desc.yaml +++ b/test/cases/topn/data/want/aggr_desc.yaml @@ -44,12 +44,11 @@ lists: - entity: - key: service_id value: - str: - value: "" + "null": null - key: entity_id value: str: value: entity_2 value: int: - value: "10" \ No newline at end of file + value: "10"
