This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch index in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e3f4e60e611972b989ad152c283d00da986eb699 Author: Gao Hongtao <[email protected]> AuthorDate: Wed Nov 13 14:00:22 2024 +0800 Change the options to "index_mode" Signed-off-by: Gao Hongtao <[email protected]> --- api/proto/banyandb/database/v1/schema.proto | 5 +- banyand/measure/measure.go | 2 +- banyand/measure/query.go | 36 +++++---- banyand/measure/write.go | 113 +++++++++++++++------------- banyand/stream/stream.go | 2 +- dist/LICENSE | 2 +- docs/api-reference.md | 2 +- go.mod | 2 +- go.sum | 6 +- pkg/index/index.go | 4 +- pkg/index/inverted/inverted.go | 5 +- pkg/index/inverted/inverted_series.go | 35 +++------ pkg/index/inverted/sort.go | 2 +- pkg/partition/index.go | 5 +- pkg/query/model/model.go | 20 ----- 15 files changed, 105 insertions(+), 136 deletions(-) diff --git a/api/proto/banyandb/database/v1/schema.proto b/api/proto/banyandb/database/v1/schema.proto index 83fbeb23..75876e3a 100644 --- a/api/proto/banyandb/database/v1/schema.proto +++ b/api/proto/banyandb/database/v1/schema.proto @@ -115,8 +115,9 @@ message Measure { string interval = 5; // updated_at indicates when the measure is updated google.protobuf.Timestamp updated_at = 6; - // non_time_series indicates whether the measure is a time series - bool non_time_series = 7; + // index_mode specifies whether the data should be stored exclusively in the index, + // meaning it will not be stored in the data storage system. + bool index_mode = 7; } message MeasureAggregateFunction { diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go index f278735e..40a10898 100644 --- a/banyand/measure/measure.go +++ b/banyand/measure/measure.go @@ -103,7 +103,7 @@ func (s *measure) parseSpec() (err error) { if s.schema.Interval != "" { s.interval, err = timestamp.ParseDuration(s.schema.Interval) } - s.indexRuleLocators, s.fieldIndexLocation = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules) + s.indexRuleLocators, s.fieldIndexLocation = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules, s.schema.IndexMode) return err } diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 457e9066..32399332 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -90,10 +90,10 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr tsdb := db.(storage.TSDB[*tsTable, option]) segments := tsdb.SelectSegments(*mqo.TimeRange) if len(segments) < 1 { - return model.BypassResult, nil + return nil, nil } - if s.schema.NonTimeSeries { + if s.schema.IndexMode { return s.buildIndexQueryResult(ctx, series, mqo, segments) } @@ -102,12 +102,15 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr return nil, err } if len(sids) < 1 { - return model.BypassResult, nil + for i := range segments { + segments[i].DecRef() + } + return nil, nil } result := queryResult{ ctx: ctx, segments: segments, - tagProjection: newTagProjection, + tagProjection: mqo.TagProjection, storedIndexValue: storedIndexValue, } defer func() { @@ -682,25 +685,20 @@ func (qr *queryResult) merge(storedIndexValue map[common.SeriesID]map[string]*mo return result } -var ( - bypassVersions = []int64{0} - bypassFields = []model.Field{} -) +var bypassVersions = []int64{0} type indexQueryResult struct { - mqo model.MeasureQueryOptions + ctx context.Context + err error tfl []tagFamilyLocation indexProjection []index.FieldKey - ctx context.Context series []*pbv1.Series - - segments []storage.Segment[*tsTable, option] - - i int - sll pbv1.SeriesList - frl storage.FieldResultList - timestamps []int64 - err error + segments []storage.Segment[*tsTable, option] + sll pbv1.SeriesList + frl storage.FieldResultList + timestamps []int64 + mqo model.MeasureQueryOptions + i int } // Pull implements model.MeasureQueryResult. @@ -768,7 +766,7 @@ func (i *indexQueryResult) Release() { } type tagFamilyLocation struct { - name string fieldToValueType map[string]tagNameWithType projectedEntityOffsets map[string]int + name string } diff --git a/banyand/measure/write.go b/banyand/measure/write.go index 6f5a9659..ab0879a2 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -125,6 +125,61 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me )) } dpt.dataPoints.fields = append(dpt.dataPoints.fields, field) + tagFamily, fields := w.handleTagFamily(stm, req) + dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies, tagFamily) + + if stm.processorManager != nil { + stm.processorManager.onMeasureWrite(uint64(series.ID), &measurev1.InternalWriteRequest{ + Request: &measurev1.WriteRequest{ + Metadata: stm.GetSchema().Metadata, + DataPoint: req.DataPoint, + MessageId: uint64(time.Now().UnixNano()), + }, + EntityValues: writeEvent.EntityValues, + }) + } + + doc := index.Document{ + DocID: uint64(series.ID), + EntityValues: series.Buffer, + Fields: fields, + } + if stm.schema.IndexMode { + doc.Timestamp = ts + } + dpg.docs = append(dpg.docs, doc) + + return dst, nil +} + +func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPointsInGroup, t time.Time, ts int64, shardID common.ShardID) (*dataPointsInTable, error) { + var segment storage.Segment[*tsTable, option] + for _, seg := range dpg.segments { + if seg.GetTimeRange().Contains(ts) { + segment = seg + } + } + if segment == nil { + var err error + segment, err = tsdb.CreateSegmentIfNotExist(t) + if err != nil { + return nil, fmt.Errorf("cannot create segment: %w", err) + } + dpg.segments = append(dpg.segments, segment) + } + tstb, err := segment.CreateTSTableIfNotExist(shardID) + if err != nil { + return nil, fmt.Errorf("cannot create ts table: %w", err) + } + dpt := &dataPointsInTable{ + timeRange: segment.GetTimeRange(), + tsTable: tstb, + } + dpg.tables = append(dpg.tables, dpt) + return dpt, nil +} + +func (w *writeCallback) handleTagFamily(stm *measure, req *measurev1.WriteRequest) ([]nameValues, []index.Field) { tagFamilies := make([]nameValues, 0, len(stm.schema.TagFamilies)) if len(stm.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) { logger.Panicf("metadata crashed, tag family rule length %d, tag family length %d", @@ -157,18 +212,18 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me t.Type, tagValue) r, ok := tfr[t.Name] - if ok || stm.schema.NonTimeSeries { + if ok || stm.schema.IndexMode { fieldKey := index.FieldKey{} switch { case ok: fieldKey.IndexRuleID = r.GetMetadata().GetId() fieldKey.Analyzer = r.Analyzer - case stm.schema.NonTimeSeries: + case stm.schema.IndexMode: fieldKey.TagName = t.Name default: logger.Panicf("metadata crashed, tag family rule %s not found", t.Name) } - toIndex := ok || !stm.schema.NonTimeSeries + toIndex := ok || !stm.schema.IndexMode if encodeTagValue.value != nil { fields = append(fields, index.Field{ Key: fieldKey, @@ -200,57 +255,7 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me tagFamilies = append(tagFamilies, tf) } } - dpt.dataPoints.tagFamilies = append(dpt.dataPoints.tagFamilies, tagFamilies) - - if stm.processorManager != nil { - stm.processorManager.onMeasureWrite(uint64(series.ID), &measurev1.InternalWriteRequest{ - Request: &measurev1.WriteRequest{ - Metadata: stm.GetSchema().Metadata, - DataPoint: req.DataPoint, - MessageId: uint64(time.Now().UnixNano()), - }, - EntityValues: writeEvent.EntityValues, - }) - } - - doc := index.Document{ - DocID: uint64(series.ID), - EntityValues: series.Buffer, - Fields: fields, - } - if stm.schema.NonTimeSeries { - doc.Timestamp = ts - } - dpg.docs = append(dpg.docs, doc) - - return dst, nil -} - -func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPointsInGroup, t time.Time, ts int64, shardID common.ShardID) (*dataPointsInTable, error) { - var segment storage.Segment[*tsTable, option] - for _, seg := range dpg.segments { - if seg.GetTimeRange().Contains(ts) { - segment = seg - } - } - if segment == nil { - var err error - segment, err = tsdb.CreateSegmentIfNotExist(t) - if err != nil { - return nil, fmt.Errorf("cannot create segment: %w", err) - } - dpg.segments = append(dpg.segments, segment) - } - tstb, err := segment.CreateTSTableIfNotExist(shardID) - if err != nil { - return nil, fmt.Errorf("cannot create ts table: %w", err) - } - dpt := &dataPointsInTable{ - timeRange: segment.GetTimeRange(), - tsTable: tstb, - } - dpg.tables = append(dpg.tables, dpt) - return dpt, nil + return tagFamilies, fields } func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Message) { diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index 60596f57..1243a38e 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -89,7 +89,7 @@ func (s *stream) Close() error { func (s *stream) parseSpec() { s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup() - s.indexRuleLocators, _ = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules) + s.indexRuleLocators, _ = partition.ParseIndexRuleLocators(s.schema.GetEntity(), s.schema.GetTagFamilies(), s.indexRules, false) } type streamSpec struct { diff --git a/dist/LICENSE b/dist/LICENSE index a6503207..f90ac0ca 100644 --- a/dist/LICENSE +++ b/dist/LICENSE @@ -326,7 +326,7 @@ MIT licenses github.com/go-ole/go-ole v1.3.0 MIT github.com/go-resty/resty/v2 v2.14.0 MIT github.com/go-task/slim-sprig/v3 v3.0.0 MIT - github.com/golang-jwt/jwt/v4 v4.5.0 MIT + github.com/golang-jwt/jwt/v4 v4.5.1 MIT github.com/json-iterator/go v1.1.12 MIT github.com/mattn/go-colorable v0.1.13 MIT github.com/mattn/go-isatty v0.0.20 MIT diff --git a/docs/api-reference.md b/docs/api-reference.md index eb008338..4954ce1f 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -1076,7 +1076,7 @@ Measure intends to store data point | entity | [Entity](#banyandb-database-v1-Entity) | | entity indicates which tags will be to generate a series and shard a measure | | interval | [string](#string) | | interval indicates how frequently to send a data point valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d". | | updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | updated_at indicates when the measure is updated | -| non_time_series | [bool](#bool) | | non_time_series indicates whether the measure is a time series | +| index_mode | [bool](#bool) | | index_mode specifies whether the data should be stored exclusively in the index, meaning it will not be stored in the data storage system. | diff --git a/go.mod b/go.mod index 56646ccf..57454f71 100644 --- a/go.mod +++ b/go.mod @@ -83,7 +83,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.3.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang-jwt/jwt/v4 v4.5.0 // indirect + github.com/golang-jwt/jwt/v4 v4.5.1 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.1.3 // indirect github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect diff --git a/go.sum b/go.sum index d833ec3d..5c140a3b 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,6 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ= github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= -github.com/SkyAPM/bluge v0.0.0-20241110125856-046bc03b30ab h1:iCQVR0hi6Kd4Pzh/QDHrBIK8TDTKzwEEzrYhdvqoPRg= -github.com/SkyAPM/bluge v0.0.0-20241110125856-046bc03b30ab/go.mod h1:6o9wC3xO3qb5Q7VmD1x0r54qQBDpO9+ghGAQvuOHsCU= github.com/SkyAPM/bluge v0.0.0-20241111124917-c317df1af201 h1:QX/WvtL8j5Zrbs68EVEiOE2nFQSvoT5oTkOFh2uNSpg= github.com/SkyAPM/bluge v0.0.0-20241111124917-c317df1af201/go.mod h1:6o9wC3xO3qb5Q7VmD1x0r54qQBDpO9+ghGAQvuOHsCU= github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s= @@ -104,8 +102,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= -github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang-jwt/jwt/v4 v4.5.1 h1:JdqV9zKUdtaa9gdPlywC3aeoEsR681PlKC+4F5gQgeo= +github.com/golang-jwt/jwt/v4 v4.5.1/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/pkg/index/index.go b/pkg/index/index.go index 11e6e379..fc1b8f5c 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -49,9 +49,9 @@ const ( // FieldKey is the key of field in a document. type FieldKey struct { Analyzer string + TagName string SeriesID common.SeriesID IndexRuleID uint32 - TagName string } // Marshal encodes f to string. @@ -221,8 +221,8 @@ func (s Series) SortedField() []byte { // SeriesDocument represents a series document in an index. type SeriesDocument struct { Fields map[string][]byte - Timestamp int64 Key Series + Timestamp int64 } // SeriesStore is an abstract of a series repository. diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 9a51ed6b..084ef729 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -58,6 +58,7 @@ var ( defaultUpper = convert.Uint64ToBytes(math.MaxUint64) defaultLower = convert.Uint64ToBytes(0) defaultRangePreloadSize = 1000 + defaultProjection = []string{docIDField} ) // Analyzers is a map that associates each IndexRule_Analyzer type with a corresponding Analyzer. @@ -244,7 +245,7 @@ func (s *store) MatchTerms(field index.Field) (list posting.List, err error) { if err != nil { return nil, err } - iter := newBlugeMatchIterator(documentMatchIterator, reader, nil) + iter := newBlugeMatchIterator(documentMatchIterator, reader, defaultProjection) defer func() { err = multierr.Append(err, iter.Close()) }() @@ -275,7 +276,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string, opts *modelv1.C if err != nil { return nil, err } - iter := newBlugeMatchIterator(documentMatchIterator, reader, nil) + iter := newBlugeMatchIterator(documentMatchIterator, reader, defaultProjection) defer func() { err = multierr.Append(err, iter.Close()) }() diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index 1f245c67..9d0b15ed 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -144,10 +144,6 @@ func (s *store) Search(ctx context.Context, return nil, err } defer func() { - if err := recover(); err != nil { - _ = reader.Close() - panic(err) - } if err := recover(); err != nil { _ = reader.Close() panic(err) @@ -169,13 +165,11 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey return nil, errors.WithMessage(err, "iterate document match iterator") } docIDMap := make(map[uint64]struct{}) - fields := make([]string, 0, len(loadedFields)+3) - fields = append(fields, docIDField, entityField, timestampField) + fields := make([]string, 0, len(loadedFields)) for i := range loadedFields { fields = append(fields, loadedFields[i].Marshal()) } var hitNumber int - ctx := search.NewSearchContext(1, 0) for err == nil && next != nil { hitNumber = next.HitNumber var doc index.SeriesDocument @@ -185,17 +179,8 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey doc.Fields[fields[i]] = nil } } - err = next.LoadDocumentValues(ctx, fields) - if err != nil { - return nil, errors.WithMessagef(err, "visit stored fields, hit: %d", hitNumber) - } - for i := range fields { - vv := next.DocValues(fields[i]) - if vv == nil { - continue - } - value := vv[0] - switch fields[i] { + err = next.VisitStoredFields(func(field string, value []byte) bool { + switch field { case docIDField: id := convert.BytesToUint64(value) if _, ok := docIDMap[id]; !ok { @@ -204,17 +189,15 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey } case entityField: doc.Key.EntityValues = value - case timestampField: - ts, errTime := bluge.DecodeDateTime(value) - if errTime != nil { - return nil, err - } - doc.Timestamp = ts.UnixNano() default: - if _, ok := doc.Fields[fields[i]]; ok { - doc.Fields[fields[i]] = bytes.Clone(value) + if _, ok := doc.Fields[field]; ok { + doc.Fields[field] = bytes.Clone(value) } } + return true + }) + if err != nil { + return nil, errors.WithMessagef(err, "visit stored fields, hit: %d", hitNumber) } if doc.Key.ID > 0 { result = append(result, doc) diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go index 68071cc9..23cc57df 100644 --- a/pkg/index/inverted/sort.go +++ b/pkg/index/inverted/sort.go @@ -119,7 +119,7 @@ func (si *sortIterator) loadCurrent() bool { size := si.size + si.skipped if size < 0 { // overflow - size = math.MaxInt64 + size = math.MaxInt } topNSearch := bluge.NewTopNSearch(size, si.query.(*queryNode).query).SortBy([]string{si.sortedKey}) if si.skipped > 0 { diff --git a/pkg/partition/index.go b/pkg/partition/index.go index 133dc636..bbb4daf0 100644 --- a/pkg/partition/index.go +++ b/pkg/partition/index.go @@ -40,7 +40,7 @@ type FieldIndexLocation map[string]map[string]FieldWithType // ParseIndexRuleLocators returns a IndexRuleLocator based on the tag family spec and index rules. func ParseIndexRuleLocators(entity *databasev1.Entity, families []*databasev1.TagFamilySpec, - indexRules []*databasev1.IndexRule, + indexRules []*databasev1.IndexRule, indexMode bool, ) (locators IndexRuleLocator, fil FieldIndexLocation) { locators.EntitySet = make(map[string]struct{}, len(entity.TagNames)) fil = make(FieldIndexLocation) @@ -65,6 +65,9 @@ func ParseIndexRuleLocators(entity *databasev1.Entity, families []*databasev1.Ta if ir != nil { ttr[families[i].Tags[j].Name] = ir } + if ir == nil && !indexMode { + continue + } tagFamily, ok := fil[families[i].Name] if !ok { tagFamily = make(map[string]FieldWithType) diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go index c307918d..022bcdbc 100644 --- a/pkg/query/model/model.go +++ b/pkg/query/model/model.go @@ -98,26 +98,6 @@ type MeasureQueryResult interface { Release() } -var ( - BypassResult = &bypassResult{} - dummyResult = &MeasureResult{} -) - -// bypassResult struct. -type bypassResult struct { - // Add fields as necessary -} - -// Implement Pull method. -func (b *bypassResult) Pull() *MeasureResult { - return dummyResult -} - -// Implement Release method. -func (b *bypassResult) Release() { - // No operation -} - // StreamQueryOptions is the options of a stream query. type StreamQueryOptions struct { Name string
