hanahmily commented on code in PR #459:
URL: 
https://github.com/apache/skywalking-banyandb/pull/459#discussion_r1625202773


##########
banyand/stream/block.go:
##########
@@ -446,9 +446,21 @@ func (bc *blockCursor) init(p *part, bm *blockMetadata, 
opts queryOptions) {
        bc.minTimestamp = opts.minTimestamp
        bc.maxTimestamp = opts.maxTimestamp
        bc.tagProjection = opts.TagProjection
-       if opts.elementRefMap != nil {
+       if opts.sortedRefMap != nil {
                seriesID := bc.bm.seriesID
-               bc.expectedTimestamps = opts.elementRefMap[seriesID]
+               for tw := range opts.sortedRefMap {
+                       timeRange := tw.GetTimeRange()
+                       min, max := bm.timestamps.min, bm.timestamps.max
+                       if min >= timeRange.Start.UnixNano() && max <= 
timeRange.End.UnixNano() {

Review Comment:
   ```suggestion
                        if min <= timeRange.End.UnixNano() && max >= 
timeRange.Start.UnixNano()) {
   ```



##########
pkg/query/executor/interface.go:
##########
@@ -31,7 +31,6 @@ import (
 // StreamExecutionContext allows retrieving data through the stream module.
 type StreamExecutionContext interface {
        Query(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error)
-       Sort(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamSortResult, error)
        Filter(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error)

Review Comment:
   Please merge them together since their signatures are identical.



##########
banyand/stream/query.go:
##########
@@ -18,27 +18,26 @@
 package stream
 
 import (
+       "bytes"
        "container/heap"
        "context"
        "errors"
        "fmt"
        "sort"
 
-       "go.uber.org/multierr"
-
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/convert"
-       itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 type queryOptions struct {
-       elementRefMap map[common.SeriesID][]int64
+       filteredRefMap map[common.SeriesID][]int64

Review Comment:
   Could you please define a custom type, as this type will be referenced by 
sortedRefMap?



##########
banyand/stream/query.go:
##########
@@ -18,27 +18,26 @@
 package stream
 
 import (
+       "bytes"
        "container/heap"
        "context"
        "errors"
        "fmt"
        "sort"
 
-       "go.uber.org/multierr"
-
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/convert"
-       itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 type queryOptions struct {
-       elementRefMap map[common.SeriesID][]int64
+       filteredRefMap map[common.SeriesID][]int64

Review Comment:
   Use `posting.List` instead of `[]int64`. It provides a more compact memory 
footprint.



##########
banyand/stream/query.go:
##########
@@ -670,13 +556,119 @@ func (s *stream) Filter(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (sqr p
                        }
                }
        }
-       result.orderByTS = true
        if sqo.Order == nil {
-               result.ascTS = true
+               result.orderByTS = true
+               result.asc = true
                return &result, nil
        }
+       if sqo.Order.Index == nil {
+               result.orderByTS = true
+       } else {
+               result.sortedTagLocation = sortedTagLocation
+       }
        if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
-               result.ascTS = true
+               result.asc = true
        }
        return &result, nil
 }
+
+func indexSearch(ctx context.Context, sqo pbv1.StreamQueryOptions,
+       tabWrappers []storage.TSTableWrapper[*tsTable], seriesList 
pbv1.SeriesList,
+) (map[common.SeriesID][]int64, error) {
+       if sqo.Filter == nil {
+               return nil, nil
+       }
+       var filteredRefList []elementRef
+       for _, tw := range tabWrappers {
+               index := tw.Table().Index()
+               erl, err := index.Search(ctx, seriesList, sqo.Filter, 
sqo.TimeRange)
+               if err != nil {
+                       return nil, err
+               }
+               filteredRefList = append(filteredRefList, erl...)
+       }
+       filteredRefMap := make(map[common.SeriesID][]int64)
+       if len(filteredRefList) != 0 {
+               for _, ref := range filteredRefList {
+                       if _, ok := filteredRefMap[ref.seriesID]; !ok {
+                               filteredRefMap[ref.seriesID] = 
[]int64{ref.timestamp}
+                       } else {
+                               filteredRefMap[ref.seriesID] = 
append(filteredRefMap[ref.seriesID], ref.timestamp)
+                       }
+               }
+       }
+       return filteredRefMap, nil
+}
+
+func indexSort(s *stream, sqo pbv1.StreamQueryOptions, tabWrappers 
[]storage.TSTableWrapper[*tsTable],
+       seriesList pbv1.SeriesList, filteredRefMap map[common.SeriesID][]int64,
+) (map[storage.TSTableWrapper[*tsTable]]map[common.SeriesID][]int64, 
*tagLocation, error) {
+       if sqo.Order == nil || sqo.Order.Index == nil {
+               return nil, nil, nil
+       }
+       elementRefCount := 0
+       sortedRefMap := 
make(map[storage.TSTableWrapper[*tsTable]]map[common.SeriesID][]int64)
+       iters, stl, err := s.buildItersByIndex(tabWrappers, seriesList, sqo)
+       if err != nil {
+               return nil, nil, err
+       }
+       for {

Review Comment:
   Please use `sort.Iterator` to implement the merge sort algorithm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to