This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sort in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 8bb492495fe4278d2971de57c0e0121f607628ae Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Mon Apr 1 00:22:27 2024 +0000 Add preload flag to index topN query Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- banyand/internal/storage/index.go | 4 +-- banyand/internal/storage/storage.go | 2 +- banyand/measure/query.go | 6 +++- banyand/stream/index.go | 4 +-- banyand/stream/iter_builder.go | 11 +++--- banyand/tsdb/series_seek_sort.go | 6 ++-- banyand/tsdb/seriesdb.go | 4 +-- pkg/index/index.go | 7 ++-- pkg/index/inverted/inverted.go | 37 ++++++++++++------- pkg/index/inverted/sort.go | 56 ++++++++++++++++++----------- pkg/index/lsm/search.go | 5 +-- pkg/index/testcases/duration.go | 49 ++++++++++++++++++++----- test/stress/trace/trace-duration/data.csv | 29 --------------- test/stress/trace/trace-duration/result.csv | 2 -- test/stress/trace/trace_suite_test.go | 1 + 15 files changed, 128 insertions(+), 95 deletions(-) diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index bf50a0c8..e85b2178 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -149,7 +149,7 @@ func convertIndexSeriesToSeriesList(indexSeries []index.Series) (pbv1.SeriesList return seriesList, nil } -func (s *seriesIndex) Search(ctx context.Context, series *pbv1.Series, filter index.Filter, order *pbv1.OrderBy) (pbv1.SeriesList, error) { +func (s *seriesIndex) Search(ctx context.Context, series *pbv1.Series, filter index.Filter, order *pbv1.OrderBy, preloadSize int) (pbv1.SeriesList, error) { seriesList, err := s.searchPrimary(ctx, series) if err != nil { return nil, err @@ -176,7 +176,7 @@ func (s *seriesIndex) Search(ctx context.Context, series *pbv1.Series, filter in fieldKey := index.FieldKey{ IndexRuleID: order.Index.GetMetadata().Id, } - iter, err := s.store.Iterator(fieldKey, rangeOpts, order.Sort) + iter, err := s.store.Iterator(fieldKey, rangeOpts, order.Sort, preloadSize) if err != nil { return nil, err } diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index a2feb960..bce8c8b7 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -66,7 +66,7 @@ type SupplyTSDB[T TSTable] func() T // IndexDB is the interface of index database. type IndexDB interface { Write(docs index.Documents) error - Search(ctx context.Context, series *pbv1.Series, filter index.Filter, order *pbv1.OrderBy) (pbv1.SeriesList, error) + Search(ctx context.Context, series *pbv1.Series, filter index.Filter, order *pbv1.OrderBy, preloadSize int) (pbv1.SeriesList, error) } // TSDB allows listing and getting shard details. diff --git a/banyand/measure/query.go b/banyand/measure/query.go index cf34a914..b7da2ad6 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -37,6 +37,10 @@ import ( resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) +const ( + preloadSize = 100 +) + // Query allow to retrieve measure data points. type Query interface { LoadGroup(name string) (resourceSchema.Group, bool) @@ -83,7 +87,7 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1 } }() - sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name, EntityValues: mqo.Entity}, mqo.Filter, mqo.Order) + sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name, EntityValues: mqo.Entity}, mqo.Filter, mqo.Order, preloadSize) if err != nil { return nil, err } diff --git a/banyand/stream/index.go b/banyand/stream/index.go index 60f246f8..20c02de4 100644 --- a/banyand/stream/index.go +++ b/banyand/stream/index.go @@ -52,8 +52,8 @@ func newElementIndex(ctx context.Context, root string, flushTimeoutSeconds int64 return ei, nil } -func (e *elementIndex) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (index.FieldIterator, error) { - iter, err := e.store.Iterator(fieldKey, termRange, order) +func (e *elementIndex) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, preloadSize int) (index.FieldIterator, error) { + iter, err := e.store.Iterator(fieldKey, termRange, order, preloadSize) if err != nil { return nil, err } diff --git a/banyand/stream/iter_builder.go b/banyand/stream/iter_builder.go index f5363bce..775a910e 100644 --- a/banyand/stream/iter_builder.go +++ b/banyand/stream/iter_builder.go @@ -20,8 +20,6 @@ package stream import ( "time" - "github.com/pkg/errors" - "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" @@ -32,10 +30,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/timestamp" ) -var ( - errUnspecifiedIndexType = errors.New("Unspecified index type") - rangeOpts = index.RangeOpts{} -) +var rangeOpts = index.RangeOpts{} type filterFn func(item item) bool @@ -48,6 +43,7 @@ type iterBuilder struct { tagProjection []pbv1.TagProjection seriesID common.SeriesID order modelv1.Sort + preloadSize int } func newIterBuilder(tableWrappers []storage.TSTableWrapper[*tsTable], id common.SeriesID, sso pbv1.StreamSortOptions) *iterBuilder { @@ -60,6 +56,7 @@ func newIterBuilder(tableWrappers []storage.TSTableWrapper[*tsTable], id common. tagProjection: sso.TagProjection, seriesID: id, order: sso.Order.Sort, + preloadSize: sso.MaxElementSize, } } @@ -94,7 +91,7 @@ func buildSeriesByIndex(s *iterBuilder) (series []*searcherIterator, err error) IndexRuleID: s.indexRuleForSorting.GetMetadata().GetId(), Analyzer: s.indexRuleForSorting.GetAnalyzer(), } - inner, err = tw.Table().Index().Iterator(fieldKey, rangeOpts, s.order) + inner, err = tw.Table().Index().Iterator(fieldKey, rangeOpts, s.order, s.preloadSize) if err != nil { return nil, err } diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go index 4ea8a2c3..604d5dff 100644 --- a/banyand/tsdb/series_seek_sort.go +++ b/banyand/tsdb/series_seek_sort.go @@ -18,6 +18,7 @@ package tsdb import ( + "math" "sort" "time" @@ -85,9 +86,9 @@ func (s *seekerBuilder) buildSeriesByIndex() (series []Iterator, err error) { } switch s.indexRuleForSorting.GetType() { case databasev1.IndexRule_TYPE_TREE: - inner, err = b.lsmIndexReader().Iterator(fieldKey, rangeOpts, s.order) + inner, err = b.lsmIndexReader().Iterator(fieldKey, rangeOpts, s.order, math.MaxInt) case databasev1.IndexRule_TYPE_INVERTED: - inner, err = b.invertedIndexReader().Iterator(fieldKey, rangeOpts, s.order) + inner, err = b.invertedIndexReader().Iterator(fieldKey, rangeOpts, s.order, math.MaxInt) case databasev1.IndexRule_TYPE_UNSPECIFIED: return nil, errors.WithMessagef(errUnspecifiedIndexType, "index rule:%v", s.indexRuleForSorting) } @@ -132,6 +133,7 @@ func (s *seekerBuilder) buildSeriesByTime() ([]Iterator, error) { }, termRange, s.order, + math.MaxInt, ) if err != nil { return nil, err diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go index 803f7d14..94614959 100644 --- a/banyand/tsdb/seriesdb.go +++ b/banyand/tsdb/seriesdb.go @@ -497,9 +497,9 @@ func (s *seriesDB) Search(ctx context.Context, path Path, filter index.Filter, o var err error switch order.Index.Type { case databasev1.IndexRule_TYPE_TREE: - iter, err = s.lsmIndex.Iterator(fieldKey, rangeOpts, order.Sort) + iter, err = s.lsmIndex.Iterator(fieldKey, rangeOpts, order.Sort, math.MaxInt) case databasev1.IndexRule_TYPE_INVERTED: - iter, err = s.invertedIndex.Iterator(fieldKey, rangeOpts, order.Sort) + iter, err = s.invertedIndex.Iterator(fieldKey, rangeOpts, order.Sort, math.MaxInt) default: return nil, errUnspecifiedIndexType } diff --git a/pkg/index/index.go b/pkg/index/index.go index 1796d66b..e776fa1c 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -187,8 +187,9 @@ func (i *dummyIterator) Close() error { // PostingValue is the collection of a field's values. type PostingValue struct { - Value posting.List - Term []byte + Value posting.List + Term []byte + TermRaw []byte } // Document represents a document in a index. @@ -215,7 +216,7 @@ type Writer interface { // FieldIterable allows building a FieldIterator. type FieldIterable interface { - Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort) (iter FieldIterator, err error) + Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int) (iter FieldIterator, err error) } // Searcher allows searching a field either by its key or by its key and term. diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 2dd0a887..28962031 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -55,8 +55,9 @@ const ( ) var ( - defaultUpper = convert.Uint64ToBytes(math.MaxUint64) - defaultLower = convert.Uint64ToBytes(0) + defaultUpper = convert.Uint64ToBytes(math.MaxUint64) + defaultLower = convert.Uint64ToBytes(0) + defaultRangePreloadSize = 1000 ) var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer @@ -159,7 +160,7 @@ func (s *store) Write(fields []index.Field, docID uint64) error { return nil } -func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (iter index.FieldIterator, err error) { +func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, preLoadSize int) (iter index.FieldIterator, err error) { if termRange.Lower != nil && termRange.Upper != nil && bytes.Compare(termRange.Lower, termRange.Upper) > 0 { @@ -212,7 +213,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord sortedKey: sortedKey, fk: fk, shouldDecodeTerm: shouldDecodeTerm, - size: 5, + size: preLoadSize, } return result, nil } @@ -244,7 +245,7 @@ func (s *store) MatchTerms(field index.Field) (list posting.List, err error) { if err != nil { return nil, err } - iter := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm, reader) + iter := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm, 0, reader) defer func() { err = multierr.Append(err, iter.Close()) }() @@ -277,7 +278,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List, if err != nil { return nil, err } - iter := newBlugeMatchIterator(documentMatchIterator, fk, false, reader) + iter := newBlugeMatchIterator(documentMatchIterator, fk, false, 0, reader) defer func() { err = multierr.Append(err, iter.Close()) }() @@ -289,7 +290,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List, } func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) { - iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC) + iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC, defaultRangePreloadSize) if err != nil { return roaring.DummyPostingList, err } @@ -426,14 +427,17 @@ type blugeMatchIterator struct { fieldKey string shouldDecodeTerm bool closed bool + num int + skip int } -func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey string, shouldDecodeTerm bool, closer io.Closer) blugeMatchIterator { +func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey string, shouldDecodeTerm bool, skip int, closer io.Closer) blugeMatchIterator { return blugeMatchIterator{ delegated: delegated, fieldKey: fieldKey, shouldDecodeTerm: shouldDecodeTerm, closer: closer, + skip: skip, } } @@ -466,9 +470,13 @@ func (bmi *blugeMatchIterator) nextTerm() bool { } return false } + bmi.num++ + if bmi.num <= bmi.skip { + return true + } i := 0 var docID uint64 - var term []byte + var term, termRaw []byte bmi.err = match.VisitStoredFields(func(field string, value []byte) bool { if field == docIDField { if len(value) == 8 { @@ -482,6 +490,7 @@ func (bmi *blugeMatchIterator) nextTerm() bool { if field == bmi.fieldKey { v := y.Copy(value) if bmi.shouldDecodeTerm { + termRaw = v term = index.UnmarshalTerm(v) } else { term = v @@ -500,8 +509,9 @@ func (bmi *blugeMatchIterator) nextTerm() bool { } if bmi.agg == nil { bmi.agg = &index.PostingValue{ - Term: term, - Value: roaring.NewPostingListWithInitialData(docID), + Term: term, + TermRaw: termRaw, + Value: roaring.NewPostingListWithInitialData(docID), } return true } @@ -511,8 +521,9 @@ func (bmi *blugeMatchIterator) nextTerm() bool { } bmi.current = bmi.agg bmi.agg = &index.PostingValue{ - Term: term, - Value: roaring.NewPostingListWithInitialData(docID), + Term: term, + TermRaw: termRaw, + Value: roaring.NewPostingListWithInitialData(docID), } return false } diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go index 7aa3dfa5..60c434ae 100644 --- a/pkg/index/inverted/sort.go +++ b/pkg/index/inverted/sort.go @@ -19,27 +19,29 @@ package inverted import ( + "bytes" "context" "errors" "io" - "strings" + "math" - "github.com/apache/skywalking-banyandb/pkg/index" "github.com/blugelabs/bluge" + + "github.com/apache/skywalking-banyandb/pkg/index" ) type sortIterator struct { query bluge.Query + err error reader *bluge.Reader + current *blugeMatchIterator sortedKey string fk string - shouldDecodeTerm bool + lastKey []byte + currKey []byte size int - - current *blugeMatchIterator - lastKey []byte - - err error + skipped int + shouldDecodeTerm bool } func (si *sortIterator) Next() bool { @@ -50,8 +52,7 @@ func (si *sortIterator) Next() bool { return si.loadCurrent() } - if si.current.Next() { - si.lastKey = si.current.current.Term + if si.next() { return true } si.current.Close() @@ -59,13 +60,14 @@ func (si *sortIterator) Next() bool { } func (si *sortIterator) loadCurrent() bool { - topNSearch := bluge.NewTopNSearch(si.size, si.query).SortBy([]string{si.sortedKey}) + size := si.size + si.skipped + if size < 0 { + // overflow + size = math.MaxInt64 + } + topNSearch := bluge.NewTopNSearch(size, si.query).SortBy([]string{si.sortedKey}) if si.lastKey != nil { - if strings.HasPrefix(si.sortedKey, "-") { - topNSearch = topNSearch.Before([][]byte{si.lastKey}) - } else { - topNSearch = topNSearch.After([][]byte{si.lastKey}) - } + topNSearch = topNSearch.After([][]byte{si.lastKey}) } documentMatchIterator, err := si.reader.Search(context.Background(), topNSearch) @@ -74,15 +76,27 @@ func (si *sortIterator) loadCurrent() bool { return false } - iter := newBlugeMatchIterator(documentMatchIterator, si.fk, si.shouldDecodeTerm, nil) + iter := newBlugeMatchIterator(documentMatchIterator, si.fk, si.shouldDecodeTerm, si.skipped, nil) si.current = &iter + if si.next() { + return true + } + si.err = io.EOF + return false +} + +func (si *sortIterator) next() bool { if si.current.Next() { - si.lastKey = si.current.current.Term + currKey := si.current.Val().TermRaw + if si.currKey != nil && !bytes.Equal(currKey, si.currKey) { + si.lastKey = si.currKey + si.skipped = 0 + } + si.currKey = currKey + si.skipped += si.current.Val().Value.Len() return true - } else { - si.err = io.EOF - return false } + return false } func (si *sortIterator) Val() *index.PostingValue { diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go index daa21677..eba435cb 100644 --- a/pkg/index/lsm/search.go +++ b/pkg/index/lsm/search.go @@ -19,6 +19,7 @@ package lsm import ( "bytes" + "math" "github.com/pkg/errors" "go.uber.org/multierr" @@ -50,7 +51,7 @@ func (s *store) MatchTerms(field index.Field) (list posting.List, err error) { } func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) { - iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC) + iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC, math.MaxInt) if err != nil { return roaring.DummyPostingList, err } @@ -62,7 +63,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti return } -func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (index.FieldIterator, error) { +func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, _ int) (index.FieldIterator, error) { if !s.closer.AddRunning() { return nil, errors.New("lsm index store is closed") } diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go index 2a61322b..ebbc9d70 100644 --- a/pkg/index/testcases/duration.go +++ b/pkg/index/testcases/duration.go @@ -19,6 +19,7 @@ package testcases import ( + "fmt" "sort" "testing" @@ -57,8 +58,6 @@ type result struct { // RunDuration executes duration related cases. func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) { - tester := assert.New(t) - is := require.New(t) tests := []struct { name string want []int @@ -262,9 +261,34 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) { }, }, } - for _, tt := range tests { + preLoadSizes := []int{7, 20, 50} + allTests := make([]struct { + name string + want []int + args args + preloadSize int + }, 0, len(tests)*len(preLoadSizes)) + + for _, size := range preLoadSizes { + for _, t := range tests { + allTests = append(allTests, struct { + name string + want []int + args args + preloadSize int + }{ + name: t.name + " preLoadSize " + fmt.Sprint(size), + want: t.want, + preloadSize: size, + args: t.args, + }) + } + } + for _, tt := range allTests { t.Run(tt.name, func(t *testing.T) { - iter, err := store.Iterator(tt.args.fieldKey, tt.args.termRange, tt.args.orderType) + tester := assert.New(t) + is := require.New(t) + iter, err := store.Iterator(tt.args.fieldKey, tt.args.termRange, tt.args.orderType, tt.preloadSize) is.NoError(err) if iter == nil { tester.Empty(tt.want) @@ -278,11 +302,20 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) { }() is.NotNil(iter) got := make([]result, 0) + var currResult result for iter.Next() { - got = append(got, result{ - key: int(convert.BytesToInt64(iter.Val().Term)), - items: toArray(iter.Val().Value), - }) + key := int(convert.BytesToInt64(iter.Val().Term)) + if currResult.key != key { + if currResult.key != 0 { + got = append(got, currResult) + currResult = result{} + } + currResult.key = key + } + currResult.items = append(currResult.items, toArray(iter.Val().Value)...) + } + if len(currResult.items) > 0 { + got = append(got, currResult) } for i := 0; i < 10; i++ { is.False(iter.Next()) diff --git a/test/stress/trace/trace-duration/data.csv b/test/stress/trace/trace-duration/data.csv deleted file mode 100644 index d982e51c..00000000 --- a/test/stress/trace/trace-duration/data.csv +++ /dev/null @@ -1,29 +0,0 @@ -0.015663 -0.014582 -0.015041 -0.013952 -0.015416 -0.014160 -0.013451 -0.013345 -0.013596 -0.034798 -0.013068 -0.013051 -0.013199 -0.013267 -0.013499 -0.012751 -0.013335 -0.013282 -0.012895 -0.013164 -0.012693 -0.012857 -0.012879 -0.014324 -0.013011 -0.013627 -0.017495 -0.012770 -0.012918 diff --git a/test/stress/trace/trace-duration/result.csv b/test/stress/trace/trace-duration/result.csv deleted file mode 100644 index c1cd517a..00000000 --- a/test/stress/trace/trace-duration/result.csv +++ /dev/null @@ -1,2 +0,0 @@ -Metric Name, Min, Max, Mean, Median, P95 -result, 0.012693, 0.034798, 0.014417, 0.013335, 0.016579 diff --git a/test/stress/trace/trace_suite_test.go b/test/stress/trace/trace_suite_test.go index 85c07123..a749a236 100644 --- a/test/stress/trace/trace_suite_test.go +++ b/test/stress/trace/trace_suite_test.go @@ -31,6 +31,7 @@ import ( ) func TestIntegrationLoad(t *testing.T) { + t.Skip("Skip the stress test") RegisterFailHandler(Fail) RunSpecs(t, "Stress Trace Suite") }