hanahmily commented on code in PR #459:
URL:
https://github.com/apache/skywalking-banyandb/pull/459#discussion_r1625202773
##########
banyand/stream/block.go:
##########
@@ -446,9 +446,21 @@ func (bc *blockCursor) init(p *part, bm *blockMetadata,
opts queryOptions) {
bc.minTimestamp = opts.minTimestamp
bc.maxTimestamp = opts.maxTimestamp
bc.tagProjection = opts.TagProjection
- if opts.elementRefMap != nil {
+ if opts.sortedRefMap != nil {
seriesID := bc.bm.seriesID
- bc.expectedTimestamps = opts.elementRefMap[seriesID]
+ for tw := range opts.sortedRefMap {
+ timeRange := tw.GetTimeRange()
+ min, max := bm.timestamps.min, bm.timestamps.max
+ if min >= timeRange.Start.UnixNano() && max <=
timeRange.End.UnixNano() {
Review Comment:
```suggestion
if min <= timeRange.End.UnixNano() && max >=
timeRange.Start.UnixNano()) {
```
##########
pkg/query/executor/interface.go:
##########
@@ -31,7 +31,6 @@ import (
// StreamExecutionContext allows retrieving data through the stream module.
type StreamExecutionContext interface {
Query(ctx context.Context, opts pbv1.StreamQueryOptions)
(pbv1.StreamQueryResult, error)
- Sort(ctx context.Context, opts pbv1.StreamQueryOptions)
(pbv1.StreamSortResult, error)
Filter(ctx context.Context, opts pbv1.StreamQueryOptions)
(pbv1.StreamQueryResult, error)
Review Comment:
Please merge them together since their signatures are identical.
##########
banyand/stream/query.go:
##########
@@ -18,27 +18,26 @@
package stream
import (
+ "bytes"
"container/heap"
"context"
"errors"
"fmt"
"sort"
- "go.uber.org/multierr"
-
"github.com/apache/skywalking-banyandb/api/common"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/convert"
- itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
)
type queryOptions struct {
- elementRefMap map[common.SeriesID][]int64
+ filteredRefMap map[common.SeriesID][]int64
Review Comment:
Could you please define a custom type, as this type will be referenced by
sortedRefMap?
##########
banyand/stream/query.go:
##########
@@ -18,27 +18,26 @@
package stream
import (
+ "bytes"
"container/heap"
"context"
"errors"
"fmt"
"sort"
- "go.uber.org/multierr"
-
"github.com/apache/skywalking-banyandb/api/common"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/convert"
- itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
)
type queryOptions struct {
- elementRefMap map[common.SeriesID][]int64
+ filteredRefMap map[common.SeriesID][]int64
Review Comment:
Use `posting.List` instead of `[]int64`. It provides a more compact memory
footprint.
##########
banyand/stream/query.go:
##########
@@ -670,13 +556,119 @@ func (s *stream) Filter(ctx context.Context, sqo
pbv1.StreamQueryOptions) (sqr p
}
}
}
- result.orderByTS = true
if sqo.Order == nil {
- result.ascTS = true
+ result.orderByTS = true
+ result.asc = true
return &result, nil
}
+ if sqo.Order.Index == nil {
+ result.orderByTS = true
+ } else {
+ result.sortedTagLocation = sortedTagLocation
+ }
if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort ==
modelv1.Sort_SORT_UNSPECIFIED {
- result.ascTS = true
+ result.asc = true
}
return &result, nil
}
+
+func indexSearch(ctx context.Context, sqo pbv1.StreamQueryOptions,
+ tabWrappers []storage.TSTableWrapper[*tsTable], seriesList
pbv1.SeriesList,
+) (map[common.SeriesID][]int64, error) {
+ if sqo.Filter == nil {
+ return nil, nil
+ }
+ var filteredRefList []elementRef
+ for _, tw := range tabWrappers {
+ index := tw.Table().Index()
+ erl, err := index.Search(ctx, seriesList, sqo.Filter,
sqo.TimeRange)
+ if err != nil {
+ return nil, err
+ }
+ filteredRefList = append(filteredRefList, erl...)
+ }
+ filteredRefMap := make(map[common.SeriesID][]int64)
+ if len(filteredRefList) != 0 {
+ for _, ref := range filteredRefList {
+ if _, ok := filteredRefMap[ref.seriesID]; !ok {
+ filteredRefMap[ref.seriesID] =
[]int64{ref.timestamp}
+ } else {
+ filteredRefMap[ref.seriesID] =
append(filteredRefMap[ref.seriesID], ref.timestamp)
+ }
+ }
+ }
+ return filteredRefMap, nil
+}
+
+func indexSort(s *stream, sqo pbv1.StreamQueryOptions, tabWrappers
[]storage.TSTableWrapper[*tsTable],
+ seriesList pbv1.SeriesList, filteredRefMap map[common.SeriesID][]int64,
+) (map[storage.TSTableWrapper[*tsTable]]map[common.SeriesID][]int64,
*tagLocation, error) {
+ if sqo.Order == nil || sqo.Order.Index == nil {
+ return nil, nil, nil
+ }
+ elementRefCount := 0
+ sortedRefMap :=
make(map[storage.TSTableWrapper[*tsTable]]map[common.SeriesID][]int64)
+ iters, stl, err := s.buildItersByIndex(tabWrappers, seriesList, sqo)
+ if err != nil {
+ return nil, nil, err
+ }
+ for {
Review Comment:
Please use `sort.Iterator` to implement the merge sort algorithm.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]