This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sort
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c9dbd96147c03c98a642df37efc9a93ded167741
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sun Mar 24 23:45:27 2024 +0000

    Add a cursor to sort index scan
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 banyand/stream/iter_builder.go              |  9 +--
 pkg/index/inverted/inverted.go              | 16 +++--
 pkg/index/inverted/sort.go                  | 98 +++++++++++++++++++++++++++++
 pkg/iter/sort/sort.go                       |  8 +--
 test/stress/trace/Makefile                  | 11 +++-
 test/stress/trace/trace-duration/data.csv   | 29 +++++++++
 test/stress/trace/trace-duration/result.csv |  2 +
 test/stress/trace/trace_suite_test.go       |  1 -
 8 files changed, 153 insertions(+), 21 deletions(-)

diff --git a/banyand/stream/iter_builder.go b/banyand/stream/iter_builder.go
index 8c7a9d7b..f5363bce 100644
--- a/banyand/stream/iter_builder.go
+++ b/banyand/stream/iter_builder.go
@@ -94,14 +94,7 @@ func buildSeriesByIndex(s *iterBuilder) (series 
[]*searcherIterator, err error)
                        IndexRuleID: 
s.indexRuleForSorting.GetMetadata().GetId(),
                        Analyzer:    s.indexRuleForSorting.GetAnalyzer(),
                }
-               switch s.indexRuleForSorting.GetType() {
-               case databasev1.IndexRule_TYPE_TREE:
-                       inner, err = tw.Table().Index().Iterator(fieldKey, 
rangeOpts, s.order)
-               case databasev1.IndexRule_TYPE_INVERTED:
-                       inner, err = tw.Table().Index().Iterator(fieldKey, 
rangeOpts, s.order)
-               case databasev1.IndexRule_TYPE_UNSPECIFIED:
-                       return nil, 
errors.WithMessagef(errUnspecifiedIndexType, "index rule:%v", 
s.indexRuleForSorting)
-               }
+               inner, err = tw.Table().Index().Iterator(fieldKey, rangeOpts, 
s.order)
                if err != nil {
                        return nil, err
                }
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 9f4c6cc5..2dd0a887 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -206,12 +206,15 @@ func (s *store) Iterator(fieldKey index.FieldKey, 
termRange index.RangeOpts, ord
        if order == modelv1.Sort_SORT_DESC {
                sortedKey = "-" + sortedKey
        }
-       documentMatchIterator, err := reader.Search(context.Background(), 
bluge.NewTopNSearch(math.MaxInt64, query).SortBy([]string{sortedKey}))
-       if err != nil {
-               return nil, multierr.Combine(err, reader.Close())
+       result := &sortIterator{
+               query:            query,
+               reader:           reader,
+               sortedKey:        sortedKey,
+               fk:               fk,
+               shouldDecodeTerm: shouldDecodeTerm,
+               size:             5,
        }
-       result := newBlugeMatchIterator(documentMatchIterator, fk, 
shouldDecodeTerm, reader)
-       return &result, nil
+       return result, nil
 }
 
 func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err 
error) {
@@ -520,5 +523,8 @@ func (bmi *blugeMatchIterator) Val() *index.PostingValue {
 
 func (bmi *blugeMatchIterator) Close() error {
        bmi.closed = true
+       if bmi.closer == nil {
+               return bmi.err
+       }
        return errors.Join(bmi.err, bmi.closer.Close())
 }
diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go
new file mode 100644
index 00000000..7aa3dfa5
--- /dev/null
+++ b/pkg/index/inverted/sort.go
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package inverted implements a inverted index repository.
+package inverted
+
+import (
+       "context"
+       "errors"
+       "io"
+       "strings"
+
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/blugelabs/bluge"
+)
+
+type sortIterator struct {
+       query            bluge.Query
+       reader           *bluge.Reader
+       sortedKey        string
+       fk               string
+       shouldDecodeTerm bool
+       size             int
+
+       current *blugeMatchIterator
+       lastKey []byte
+
+       err error
+}
+
+func (si *sortIterator) Next() bool {
+       if si.err != nil {
+               return false
+       }
+       if si.current == nil {
+               return si.loadCurrent()
+       }
+
+       if si.current.Next() {
+               si.lastKey = si.current.current.Term
+               return true
+       }
+       si.current.Close()
+       return si.loadCurrent()
+}
+
+func (si *sortIterator) loadCurrent() bool {
+       topNSearch := bluge.NewTopNSearch(si.size, 
si.query).SortBy([]string{si.sortedKey})
+       if si.lastKey != nil {
+               if strings.HasPrefix(si.sortedKey, "-") {
+                       topNSearch = topNSearch.Before([][]byte{si.lastKey})
+               } else {
+                       topNSearch = topNSearch.After([][]byte{si.lastKey})
+               }
+       }
+
+       documentMatchIterator, err := si.reader.Search(context.Background(), 
topNSearch)
+       if err != nil {
+               si.err = err
+               return false
+       }
+
+       iter := newBlugeMatchIterator(documentMatchIterator, si.fk, 
si.shouldDecodeTerm, nil)
+       si.current = &iter
+       if si.current.Next() {
+               si.lastKey = si.current.current.Term
+               return true
+       } else {
+               si.err = io.EOF
+               return false
+       }
+}
+
+func (si *sortIterator) Val() *index.PostingValue {
+       return si.current.Val()
+}
+
+func (si *sortIterator) Close() error {
+       if errors.Is(si.err, io.EOF) {
+               si.err = nil
+               return errors.Join(si.current.Close(), si.reader.Close())
+       }
+       return errors.Join(si.err, si.current.Close(), si.reader.Close())
+}
diff --git a/pkg/iter/sort/sort.go b/pkg/iter/sort/sort.go
index b0d67eb5..f8aa1f0c 100644
--- a/pkg/iter/sort/sort.go
+++ b/pkg/iter/sort/sort.go
@@ -21,7 +21,8 @@ package sort
 import (
        "bytes"
        "container/heap"
-       "fmt"
+
+       "go.uber.org/multierr"
 )
 
 // Comparable is an interface that allows sorting of items.
@@ -124,10 +125,7 @@ func (it *itemIter[T]) Val() T {
 func (it *itemIter[T]) Close() error {
        var err error
        for _, iter := range it.iters {
-               if e := iter.Close(); e != nil {
-                       fmt.Println("Error closing iterator:", e)
-                       err = e
-               }
+               err = multierr.Append(err, iter.Close())
        }
        return err
 }
diff --git a/test/stress/trace/Makefile b/test/stress/trace/Makefile
index f331107e..27c9cbe6 100644
--- a/test/stress/trace/Makefile
+++ b/test/stress/trace/Makefile
@@ -15,6 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+mk_path  := $(abspath $(lastword $(MAKEFILE_LIST)))
+mk_dir   := $(dir $(mk_path))
+root_dir := $(abspath $(mk_dir)/../../..)
+tool_bin := $(root_dir)/bin
+
+include $(root_dir)/scripts/build/version.mk
+include $(root_dir)/scripts/build/ginkgo.mk
 
 NAME := stress
 
@@ -59,6 +66,6 @@ rm_traffic:
        curl -XDELETE 'http://localhost:12800/mock-data/segments/tasks'
 
 .PHONY: test_query
-test_query:
-       go test -v -timeout 1h -run TestQuery ./...
+test_query: $(GINKGO)
+       $(GINKGO) -v -timeout 1h TestQuery ./...
        
\ No newline at end of file
diff --git a/test/stress/trace/trace-duration/data.csv 
b/test/stress/trace/trace-duration/data.csv
new file mode 100644
index 00000000..d982e51c
--- /dev/null
+++ b/test/stress/trace/trace-duration/data.csv
@@ -0,0 +1,29 @@
+0.015663
+0.014582
+0.015041
+0.013952
+0.015416
+0.014160
+0.013451
+0.013345
+0.013596
+0.034798
+0.013068
+0.013051
+0.013199
+0.013267
+0.013499
+0.012751
+0.013335
+0.013282
+0.012895
+0.013164
+0.012693
+0.012857
+0.012879
+0.014324
+0.013011
+0.013627
+0.017495
+0.012770
+0.012918
diff --git a/test/stress/trace/trace-duration/result.csv 
b/test/stress/trace/trace-duration/result.csv
new file mode 100644
index 00000000..c1cd517a
--- /dev/null
+++ b/test/stress/trace/trace-duration/result.csv
@@ -0,0 +1,2 @@
+Metric Name, Min, Max, Mean, Median, P95
+result, 0.012693, 0.034798, 0.014417, 0.013335, 0.016579
diff --git a/test/stress/trace/trace_suite_test.go 
b/test/stress/trace/trace_suite_test.go
index c450a8db..85c07123 100644
--- a/test/stress/trace/trace_suite_test.go
+++ b/test/stress/trace/trace_suite_test.go
@@ -31,7 +31,6 @@ import (
 )
 
 func TestIntegrationLoad(t *testing.T) {
-       t.Skip("Skip the stress trace test")
        RegisterFailHandler(Fail)
        RunSpecs(t, "Stress Trace Suite")
 }

Reply via email to