This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sort in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c9dbd96147c03c98a642df37efc9a93ded167741 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sun Mar 24 23:45:27 2024 +0000 Add a cursor to sort index scan Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- banyand/stream/iter_builder.go | 9 +-- pkg/index/inverted/inverted.go | 16 +++-- pkg/index/inverted/sort.go | 98 +++++++++++++++++++++++++++++ pkg/iter/sort/sort.go | 8 +-- test/stress/trace/Makefile | 11 +++- test/stress/trace/trace-duration/data.csv | 29 +++++++++ test/stress/trace/trace-duration/result.csv | 2 + test/stress/trace/trace_suite_test.go | 1 - 8 files changed, 153 insertions(+), 21 deletions(-) diff --git a/banyand/stream/iter_builder.go b/banyand/stream/iter_builder.go index 8c7a9d7b..f5363bce 100644 --- a/banyand/stream/iter_builder.go +++ b/banyand/stream/iter_builder.go @@ -94,14 +94,7 @@ func buildSeriesByIndex(s *iterBuilder) (series []*searcherIterator, err error) IndexRuleID: s.indexRuleForSorting.GetMetadata().GetId(), Analyzer: s.indexRuleForSorting.GetAnalyzer(), } - switch s.indexRuleForSorting.GetType() { - case databasev1.IndexRule_TYPE_TREE: - inner, err = tw.Table().Index().Iterator(fieldKey, rangeOpts, s.order) - case databasev1.IndexRule_TYPE_INVERTED: - inner, err = tw.Table().Index().Iterator(fieldKey, rangeOpts, s.order) - case databasev1.IndexRule_TYPE_UNSPECIFIED: - return nil, errors.WithMessagef(errUnspecifiedIndexType, "index rule:%v", s.indexRuleForSorting) - } + inner, err = tw.Table().Index().Iterator(fieldKey, rangeOpts, s.order) if err != nil { return nil, err } diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 9f4c6cc5..2dd0a887 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -206,12 +206,15 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord if order == modelv1.Sort_SORT_DESC { sortedKey = "-" + sortedKey } - documentMatchIterator, err := reader.Search(context.Background(), bluge.NewTopNSearch(math.MaxInt64, query).SortBy([]string{sortedKey})) - if err != nil { - return nil, multierr.Combine(err, reader.Close()) + result := &sortIterator{ + query: query, + reader: reader, + sortedKey: sortedKey, + fk: fk, + shouldDecodeTerm: shouldDecodeTerm, + size: 5, } - result := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm, reader) - return &result, nil + return result, nil } func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err error) { @@ -520,5 +523,8 @@ func (bmi *blugeMatchIterator) Val() *index.PostingValue { func (bmi *blugeMatchIterator) Close() error { bmi.closed = true + if bmi.closer == nil { + return bmi.err + } return errors.Join(bmi.err, bmi.closer.Close()) } diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go new file mode 100644 index 00000000..7aa3dfa5 --- /dev/null +++ b/pkg/index/inverted/sort.go @@ -0,0 +1,98 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package inverted implements a inverted index repository. +package inverted + +import ( + "context" + "errors" + "io" + "strings" + + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/blugelabs/bluge" +) + +type sortIterator struct { + query bluge.Query + reader *bluge.Reader + sortedKey string + fk string + shouldDecodeTerm bool + size int + + current *blugeMatchIterator + lastKey []byte + + err error +} + +func (si *sortIterator) Next() bool { + if si.err != nil { + return false + } + if si.current == nil { + return si.loadCurrent() + } + + if si.current.Next() { + si.lastKey = si.current.current.Term + return true + } + si.current.Close() + return si.loadCurrent() +} + +func (si *sortIterator) loadCurrent() bool { + topNSearch := bluge.NewTopNSearch(si.size, si.query).SortBy([]string{si.sortedKey}) + if si.lastKey != nil { + if strings.HasPrefix(si.sortedKey, "-") { + topNSearch = topNSearch.Before([][]byte{si.lastKey}) + } else { + topNSearch = topNSearch.After([][]byte{si.lastKey}) + } + } + + documentMatchIterator, err := si.reader.Search(context.Background(), topNSearch) + if err != nil { + si.err = err + return false + } + + iter := newBlugeMatchIterator(documentMatchIterator, si.fk, si.shouldDecodeTerm, nil) + si.current = &iter + if si.current.Next() { + si.lastKey = si.current.current.Term + return true + } else { + si.err = io.EOF + return false + } +} + +func (si *sortIterator) Val() *index.PostingValue { + return si.current.Val() +} + +func (si *sortIterator) Close() error { + if errors.Is(si.err, io.EOF) { + si.err = nil + return errors.Join(si.current.Close(), si.reader.Close()) + } + return errors.Join(si.err, si.current.Close(), si.reader.Close()) +} diff --git a/pkg/iter/sort/sort.go b/pkg/iter/sort/sort.go index b0d67eb5..f8aa1f0c 100644 --- a/pkg/iter/sort/sort.go +++ b/pkg/iter/sort/sort.go @@ -21,7 +21,8 @@ package sort import ( "bytes" "container/heap" - "fmt" + + "go.uber.org/multierr" ) // Comparable is an interface that allows sorting of items. @@ -124,10 +125,7 @@ func (it *itemIter[T]) Val() T { func (it *itemIter[T]) Close() error { var err error for _, iter := range it.iters { - if e := iter.Close(); e != nil { - fmt.Println("Error closing iterator:", e) - err = e - } + err = multierr.Append(err, iter.Close()) } return err } diff --git a/test/stress/trace/Makefile b/test/stress/trace/Makefile index f331107e..27c9cbe6 100644 --- a/test/stress/trace/Makefile +++ b/test/stress/trace/Makefile @@ -15,6 +15,13 @@ # specific language governing permissions and limitations # under the License. # +mk_path := $(abspath $(lastword $(MAKEFILE_LIST))) +mk_dir := $(dir $(mk_path)) +root_dir := $(abspath $(mk_dir)/../../..) +tool_bin := $(root_dir)/bin + +include $(root_dir)/scripts/build/version.mk +include $(root_dir)/scripts/build/ginkgo.mk NAME := stress @@ -59,6 +66,6 @@ rm_traffic: curl -XDELETE 'http://localhost:12800/mock-data/segments/tasks' .PHONY: test_query -test_query: - go test -v -timeout 1h -run TestQuery ./... +test_query: $(GINKGO) + $(GINKGO) -v -timeout 1h TestQuery ./... \ No newline at end of file diff --git a/test/stress/trace/trace-duration/data.csv b/test/stress/trace/trace-duration/data.csv new file mode 100644 index 00000000..d982e51c --- /dev/null +++ b/test/stress/trace/trace-duration/data.csv @@ -0,0 +1,29 @@ +0.015663 +0.014582 +0.015041 +0.013952 +0.015416 +0.014160 +0.013451 +0.013345 +0.013596 +0.034798 +0.013068 +0.013051 +0.013199 +0.013267 +0.013499 +0.012751 +0.013335 +0.013282 +0.012895 +0.013164 +0.012693 +0.012857 +0.012879 +0.014324 +0.013011 +0.013627 +0.017495 +0.012770 +0.012918 diff --git a/test/stress/trace/trace-duration/result.csv b/test/stress/trace/trace-duration/result.csv new file mode 100644 index 00000000..c1cd517a --- /dev/null +++ b/test/stress/trace/trace-duration/result.csv @@ -0,0 +1,2 @@ +Metric Name, Min, Max, Mean, Median, P95 +result, 0.012693, 0.034798, 0.014417, 0.013335, 0.016579 diff --git a/test/stress/trace/trace_suite_test.go b/test/stress/trace/trace_suite_test.go index c450a8db..85c07123 100644 --- a/test/stress/trace/trace_suite_test.go +++ b/test/stress/trace/trace_suite_test.go @@ -31,7 +31,6 @@ import ( ) func TestIntegrationLoad(t *testing.T) { - t.Skip("Skip the stress trace test") RegisterFailHandler(Fail) RunSpecs(t, "Stress Trace Suite") }