This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 2f37a17c Improve sorting performance of Stream (#459) 2f37a17c is described below commit 2f37a17c873a91f654a2561c6673012ea4d42243 Author: Huang Youliang <butterbright0...@gmail.com> AuthorDate: Mon Jun 17 10:23:34 2024 +0800 Improve sorting performance of Stream (#459) * Import sorting performance of Stream --------- Co-authored-by: Gao Hongtao <hanahm...@gmail.com> --- CHANGES.md | 1 + banyand/internal/storage/index.go | 6 +- banyand/stream/benchmark_test.go | 6 +- banyand/stream/block.go | 20 +- banyand/stream/index.go | 38 +- banyand/stream/iter.go | 148 ------- banyand/stream/iter_builder.go | 124 ------ banyand/stream/part.go | 162 -------- banyand/stream/query.go | 450 +++++++++------------ banyand/stream/query_test.go | 114 +----- banyand/stream/stream.go | 2 - banyand/stream/tstable.go | 20 - pkg/index/index.go | 25 +- pkg/index/inverted/inverted.go | 19 +- pkg/index/inverted/sort.go | 4 +- pkg/index/inverted/sort_test.go | 8 +- pkg/index/testcases/duration.go | 3 +- pkg/pb/v1/metadata.go | 14 +- pkg/query/executor/interface.go | 2 - .../logical/stream/stream_plan_indexscan_local.go | 82 +--- 20 files changed, 301 insertions(+), 947 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 1d0b47a2..7fbff6b3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,7 @@ Release Notes. ### Features - Check unregistered nodes in background. +- Improve sorting performance of stream. ### Bugs diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index ed2b5ec5..4dcac9d5 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -216,11 +216,11 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, filter var sortedSeriesList pbv1.SeriesList for iter.Next() { - seriesID, _, _ := iter.Val() - if !pl.Contains(seriesID) { + docID := iter.Val().DocID + if !pl.Contains(docID) { continue } - sortedSeriesList = appendSeriesList(sortedSeriesList, seriesList, common.SeriesID(seriesID)) + sortedSeriesList = appendSeriesList(sortedSeriesList, seriesList, common.SeriesID(docID)) if err != nil { return nil, err } diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go index 4b0b4cbc..b0c711c4 100644 --- a/banyand/stream/benchmark_test.go +++ b/banyand/stream/benchmark_test.go @@ -348,8 +348,9 @@ func BenchmarkFilter(b *testing.B) { db := write(b, p, esList, docsList) s := generateStream(db) sqo := generateStreamQueryOptions(p, idx) + sqo.Order = nil b.Run("filter-"+p.scenario, func(b *testing.B) { - res, err := s.Filter(context.TODO(), sqo) + res, err := s.Query(context.TODO(), sqo) require.NoError(b, err) logicalstream.BuildElementsFromStreamResult(res) }) @@ -364,8 +365,9 @@ func BenchmarkSort(b *testing.B) { s := generateStream(db) sqo := generateStreamQueryOptions(p, idx) b.Run("sort-"+p.scenario, func(b *testing.B) { - _, err := s.Sort(context.TODO(), sqo) + res, err := s.Query(context.TODO(), sqo) require.NoError(b, err) + logicalstream.BuildElementsFromStreamResult(res) }) } } diff --git a/banyand/stream/block.go b/banyand/stream/block.go index 252b3947..71a2456c 100644 --- a/banyand/stream/block.go +++ b/banyand/stream/block.go @@ -410,7 +410,7 @@ var blockPool sync.Pool type blockCursor struct { p *part timestamps []int64 - expectedTimestamps []int64 + filteredTimestamps []int64 elementIDs []string tagFamilies []tagFamily tagValuesDecoder encoding.BytesBlockDecoder @@ -446,9 +446,9 @@ func (bc *blockCursor) init(p *part, bm *blockMetadata, opts queryOptions) { bc.minTimestamp = opts.minTimestamp bc.maxTimestamp = opts.maxTimestamp bc.tagProjection = opts.TagProjection - if opts.elementRefMap != nil { - seriesID := bc.bm.seriesID - bc.expectedTimestamps = opts.elementRefMap[seriesID] + seriesID := bc.bm.seriesID + if opts.filteredRefMap != nil { + bc.filteredTimestamps = opts.filteredRefMap[seriesID] } } @@ -464,9 +464,11 @@ func (bc *blockCursor) copyAllTo(r *pbv1.StreamResult, desc bool) { if offset <= idx { return } - r.SID = bc.bm.seriesID r.Timestamps = append(r.Timestamps, bc.timestamps[idx:offset]...) r.ElementIDs = append(r.ElementIDs, bc.elementIDs[idx:offset]...) + for i := idx; i < offset; i++ { + r.SIDs = append(r.SIDs, bc.bm.seriesID) + } if len(r.TagFamilies) != len(bc.tagProjection) { for _, tp := range bc.tagProjection { tf := pbv1.TagFamily{ @@ -497,9 +499,9 @@ func (bc *blockCursor) copyAllTo(r *pbv1.StreamResult, desc bool) { } func (bc *blockCursor) copyTo(r *pbv1.StreamResult) { - r.SID = bc.bm.seriesID r.Timestamps = append(r.Timestamps, bc.timestamps[bc.idx]) r.ElementIDs = append(r.ElementIDs, bc.elementIDs[bc.idx]) + r.SIDs = append(r.SIDs, bc.bm.seriesID) if len(r.TagFamilies) != len(bc.tagProjection) { for _, tp := range bc.tagProjection { tf := pbv1.TagFamily{ @@ -550,8 +552,8 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { idxList := make([]int, 0) var start, end int - if bc.expectedTimestamps != nil { - for _, ts := range bc.expectedTimestamps { + if bc.filteredTimestamps != nil { + for _, ts := range bc.filteredTimestamps { idx := timestamp.Find(tmpBlock.timestamps, ts) if idx == -1 { continue @@ -588,7 +590,7 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { logger.Panicf("unexpected number of values for tags %q: got %d; want %d", tmpBlock.tagFamilies[i].tags[blockIndex].name, len(tmpBlock.tagFamilies[i].tags[blockIndex].values), len(tmpBlock.timestamps)) } - if bc.expectedTimestamps != nil { + if bc.filteredTimestamps != nil { for _, idx := range idxList { t.values = append(t.values, tmpBlock.tagFamilies[i].tags[blockIndex].values[idx]) } diff --git a/banyand/stream/index.go b/banyand/stream/index.go index e1a1d42a..2455834c 100644 --- a/banyand/stream/index.go +++ b/banyand/stream/index.go @@ -54,7 +54,7 @@ func newElementIndex(ctx context.Context, root string, flushTimeoutSeconds int64 return ei, nil } -func (e *elementIndex) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order modelv1.Sort, preloadSize int) (index.FieldIterator, error) { +func (e *elementIndex) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order modelv1.Sort, preloadSize int) (index.FieldIterator[*index.ItemRef], error) { iter, err := e.store.Sort(sids, fieldKey, order, preloadSize) if err != nil { return nil, err @@ -72,6 +72,7 @@ func (e *elementIndex) Search(_ context.Context, seriesList pbv1.SeriesList, fil timeRange *timestamp.TimeRange, order *pbv1.OrderBy, ) ([]elementRef, error) { pm := make(map[common.SeriesID][]uint64) + desc := order != nil && order.Index == nil && order.Sort == modelv1.Sort_SORT_DESC for _, series := range seriesList { pl, err := filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) { return e.store, nil @@ -86,7 +87,7 @@ func (e *elementIndex) Search(_ context.Context, seriesList pbv1.SeriesList, fil continue } timestamps := pl.ToSlice() - if order != nil && order.Index == nil && order.Sort == modelv1.Sort_SORT_DESC { + if desc { sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] > timestamps[j] }) @@ -102,7 +103,7 @@ func (e *elementIndex) Search(_ context.Context, seriesList pbv1.SeriesList, fil pm[series.ID] = timestamps[start : end+1] } } - return merge(pm), nil + return merge(pm, !desc), nil } func (e *elementIndex) Close() error { @@ -119,34 +120,43 @@ type indexedElementRef struct { elemIdx int } -type priorityQueue []*indexedElementRef +type priorityQueue struct { + elementRefList []*indexedElementRef + asc bool +} -func (pq priorityQueue) Len() int { return len(pq) } +func (pq priorityQueue) Len() int { return len(pq.elementRefList) } func (pq priorityQueue) Less(i, j int) bool { - return pq[i].timestamp < pq[j].timestamp + if pq.asc { + return pq.elementRefList[i].timestamp < pq.elementRefList[j].timestamp + } + return pq.elementRefList[i].timestamp > pq.elementRefList[j].timestamp } func (pq priorityQueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] + pq.elementRefList[i], pq.elementRefList[j] = pq.elementRefList[j], pq.elementRefList[i] } func (pq *priorityQueue) Push(x interface{}) { item := x.(*indexedElementRef) - *pq = append(*pq, item) + pq.elementRefList = append(pq.elementRefList, item) } func (pq *priorityQueue) Pop() interface{} { - old := *pq - n := len(old) - item := old[n-1] - *pq = old[0 : n-1] + n := len(pq.elementRefList) + item := pq.elementRefList[n-1] + pq.elementRefList = pq.elementRefList[0 : n-1] return item } -func merge(postingMap map[common.SeriesID][]uint64) []elementRef { +func merge(postingMap map[common.SeriesID][]uint64, asc bool) []elementRef { var result []elementRef - pq := make(priorityQueue, 0) + erl := make([]*indexedElementRef, 0) + pq := priorityQueue{ + elementRefList: erl, + asc: asc, + } heap.Init(&pq) for seriesID, timestamps := range postingMap { diff --git a/banyand/stream/iter.go b/banyand/stream/iter.go deleted file mode 100644 index 39412ca2..00000000 --- a/banyand/stream/iter.go +++ /dev/null @@ -1,148 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package stream - -import ( - "errors" - "io" - - "go.uber.org/multierr" - - "github.com/apache/skywalking-banyandb/api/common" - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/partition" - pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" -) - -type searcherIterator struct { - fieldIterator index.FieldIterator - err error - tagSpecIndex map[string]*databasev1.TagSpec - timeFilter filterFn - table *tsTable - l *logger.Logger - indexFilter map[common.SeriesID]filterFn - tagProjIndex map[string]partition.TagLocator - sidToIndex map[common.SeriesID]int - entityMap map[string]int - tagProjection []pbv1.TagProjection - seriesList pbv1.SeriesList - currItem item - sortedTagLocation tagLocation -} - -func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, table *tsTable, - indexFilter map[common.SeriesID]filterFn, timeFilter filterFn, tagProjection []pbv1.TagProjection, - sortedTagLocation tagLocation, tagSpecIndex map[string]*databasev1.TagSpec, - tagProjIndex map[string]partition.TagLocator, sidToIndex map[common.SeriesID]int, - seriesList pbv1.SeriesList, entityMap map[string]int, -) *searcherIterator { - return &searcherIterator{ - fieldIterator: fieldIterator, - table: table, - indexFilter: indexFilter, - timeFilter: timeFilter, - l: l, - tagProjection: tagProjection, - sortedTagLocation: sortedTagLocation, - tagSpecIndex: tagSpecIndex, - tagProjIndex: tagProjIndex, - sidToIndex: sidToIndex, - seriesList: seriesList, - entityMap: entityMap, - } -} - -func (s *searcherIterator) Next() bool { - if s.err != nil { - return false - } - if !s.fieldIterator.Next() { - s.err = io.EOF - return false - } - itemID, seriesID, _ := s.fieldIterator.Val() - if !s.timeFilter(itemID) { - return s.Next() - } - if s.indexFilter != nil { - if f, ok := s.indexFilter[seriesID]; ok && !f(itemID) { - return s.Next() - } - } - e, c, err := s.table.getElement(seriesID, int64(itemID), s.tagProjection) - if err != nil { - s.err = err - return false - } - if len(s.tagProjIndex) != 0 { - for entity, offset := range s.tagProjIndex { - tagSpec := s.tagSpecIndex[entity] - if tagSpec.IndexedOnly { - continue - } - index, ok := s.sidToIndex[seriesID] - if !ok { - continue - } - series := s.seriesList[index] - entityPos := s.entityMap[entity] - 1 - e.tagFamilies[offset.FamilyOffset].tags[offset.TagOffset] = tag{ - name: entity, - values: mustEncodeTagValue(entity, tagSpec.GetType(), series.EntityValues[entityPos], c), - valueType: pbv1.MustTagValueToValueType(series.EntityValues[entityPos]), - } - } - } - sv, err := s.sortedTagLocation.getTagValue(e) - if err != nil { - s.err = err - return false - } - s.currItem = item{ - element: e, - count: c, - sortedTagValue: sv, - seriesID: seriesID, - } - return true -} - -func (s *searcherIterator) Val() item { - return s.currItem -} - -func (s *searcherIterator) Close() error { - if errors.Is(s.err, io.EOF) { - return s.fieldIterator.Close() - } - return multierr.Combine(s.err, s.fieldIterator.Close()) -} - -type item struct { - element *element - sortedTagValue []byte - count int - seriesID common.SeriesID -} - -func (i item) SortedField() []byte { - return i.sortedTagValue -} diff --git a/banyand/stream/iter_builder.go b/banyand/stream/iter_builder.go deleted file mode 100644 index f225acf9..00000000 --- a/banyand/stream/iter_builder.go +++ /dev/null @@ -1,124 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package stream - -import ( - "bytes" - "fmt" - - "github.com/apache/skywalking-banyandb/api/common" - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - "github.com/apache/skywalking-banyandb/banyand/internal/storage" - "github.com/apache/skywalking-banyandb/pkg/index" - pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" -) - -type filterFn func(itemID uint64) bool - -func (s *stream) buildSeriesByIndex(tableWrappers []storage.TSTableWrapper[*tsTable], - seriesList pbv1.SeriesList, sqo pbv1.StreamQueryOptions, -) (series []*searcherIterator, err error) { - timeFilter := func(itemID uint64) bool { - return sqo.TimeRange.Contains(int64(itemID)) - } - indexRuleForSorting := sqo.Order.Index - if len(indexRuleForSorting.Tags) != 1 { - return nil, fmt.Errorf("only support one tag for sorting, but got %d", len(indexRuleForSorting.Tags)) - } - sortedTag := indexRuleForSorting.Tags[0] - tl := newTagLocation() - for i := range sqo.TagProjection { - for j := range sqo.TagProjection[i].Names { - if sqo.TagProjection[i].Names[j] == sortedTag { - tl.familyIndex, tl.tagIndex = i, j - } - } - } - if !tl.valid() { - return nil, fmt.Errorf("sorted tag %s not found in tag projection", sortedTag) - } - entityMap, tagSpecIndex, tagProjIndex, sidToIndex := s.genIndex(sqo.TagProjection, seriesList) - sids := seriesList.IDs() - for _, tw := range tableWrappers { - seriesFilter := make(map[common.SeriesID]filterFn) - if sqo.Filter != nil { - for i := range sids { - pl, errExe := sqo.Filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) { - return tw.Table().Index().store, nil - }, sids[i]) - if errExe != nil { - return nil, err - } - - seriesFilter[sids[i]] = func(itemID uint64) bool { - if pl == nil { - return true - } - return pl.Contains(itemID) - } - } - } - - var inner index.FieldIterator - fieldKey := index.FieldKey{ - IndexRuleID: indexRuleForSorting.GetMetadata().GetId(), - Analyzer: indexRuleForSorting.GetAnalyzer(), - } - inner, err = tw.Table().Index().Sort(sids, fieldKey, sqo.Order.Sort, sqo.MaxElementSize) - if err != nil { - return nil, err - } - - if inner != nil { - series = append(series, newSearcherIterator(s.l, inner, tw.Table(), - seriesFilter, timeFilter, sqo.TagProjection, tl, - tagSpecIndex, tagProjIndex, sidToIndex, seriesList, entityMap)) - } - } - return -} - -type tagLocation struct { - familyIndex int - tagIndex int -} - -func newTagLocation() tagLocation { - return tagLocation{ - familyIndex: -1, - tagIndex: -1, - } -} - -func (t tagLocation) valid() bool { - return t.familyIndex != -1 && t.tagIndex != -1 -} - -func (t tagLocation) getTagValue(e *element) ([]byte, error) { - if len(e.tagFamilies) <= t.familyIndex { - return nil, fmt.Errorf("tag family index %d out of range", t.familyIndex) - } - if len(e.tagFamilies[t.familyIndex].tags) <= t.tagIndex { - return nil, fmt.Errorf("tag index %d out of range", t.tagIndex) - } - if len(e.tagFamilies[t.familyIndex].tags[t.tagIndex].values) <= e.index { - return nil, fmt.Errorf("element index %d out of range", e.index) - } - v := e.tagFamilies[t.familyIndex].tags[t.tagIndex].values[e.index] - return bytes.Clone(v), nil -} diff --git a/banyand/stream/part.go b/banyand/stream/part.go index 9d9d4836..a834e2e7 100644 --- a/banyand/stream/part.go +++ b/banyand/stream/part.go @@ -18,7 +18,6 @@ package stream import ( - "errors" "fmt" "path" "path/filepath" @@ -29,11 +28,8 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/bytes" - "github.com/apache/skywalking-banyandb/pkg/compress/zstd" - "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" - pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) const ( @@ -47,67 +43,6 @@ const ( tagFamiliesFilenameExt = ".tf" ) -type columnElements struct { - elementID []string - // TODO: change it to 1d array after refactoring low-level query - tagFamilies [][]pbv1.TagFamily - timestamp []int64 -} - -func newColumnElements() *columnElements { - ces := &columnElements{} - ces.elementID = make([]string, 0) - ces.tagFamilies = make([][]pbv1.TagFamily, 0) - ces.timestamp = make([]int64, 0) - return ces -} - -func (ces *columnElements) BuildFromElement(e *element, tp []pbv1.TagProjection) { - tagFamilies := make([]pbv1.TagFamily, 0) - for i, tf := range e.tagFamilies { - tagFamily := pbv1.TagFamily{ - Name: tf.name, - } - for j, t := range tf.tags { - tag := pbv1.Tag{ - Name: t.name, - } - if tag.Name == "" { - tag.Name = tp[i].Names[j] - tag.Values = append(tag.Values, pbv1.NullTagValue) - } else { - tag.Values = append(tag.Values, mustDecodeTagValue(t.valueType, t.values[e.index])) - } - tagFamily.Tags = append(tagFamily.Tags, tag) - } - tagFamilies = append(tagFamilies, tagFamily) - } - ces.tagFamilies = append(ces.tagFamilies, tagFamilies) - ces.elementID = append(ces.elementID, e.elementID) - ces.timestamp = append(ces.timestamp, e.timestamp) -} - -func (ces *columnElements) Pull() *pbv1.StreamColumnResult { - r := &pbv1.StreamColumnResult{} - r.Timestamps = make([]int64, 0) - r.ElementIDs = make([]string, 0) - r.TagFamilies = make([][]pbv1.TagFamily, len(ces.tagFamilies)) - r.Timestamps = append(r.Timestamps, ces.timestamp...) - r.ElementIDs = append(r.ElementIDs, ces.elementID...) - for i, tfs := range ces.tagFamilies { - r.TagFamilies[i] = make([]pbv1.TagFamily, 0) - r.TagFamilies[i] = append(r.TagFamilies[i], tfs...) - } - return r -} - -type element struct { - elementID string - tagFamilies []*tagFamily - timestamp int64 - index int -} - type part struct { primary fs.Reader timestamps fs.Reader @@ -120,10 +55,6 @@ type part struct { partMetadata partMetadata } -func (p *part) containTimestamp(timestamp int64) bool { - return timestamp >= p.partMetadata.MinTimestamp && timestamp <= p.partMetadata.MaxTimestamp -} - func (p *part) close() { fs.MustClose(p.primary) fs.MustClose(p.timestamps) @@ -140,99 +71,6 @@ func (p *part) String() string { return fmt.Sprintf("part %d", p.partMetadata.ID) } -func (p *part) getElement(seriesID common.SeriesID, timestamp int64, tagProjection []pbv1.TagProjection) (*element, int, error) { - // TODO: refactor to column-based query - // TODO: cache blocks - if seriesID < p.primaryBlockMetadata[0].seriesID { - return nil, 0, errors.New("element not found") - } - for i, primaryMeta := range p.primaryBlockMetadata { - if seriesID < p.primaryBlockMetadata[i].seriesID { - continue - } - - compressedPrimaryBuf := make([]byte, primaryMeta.size) - fs.MustReadData(p.primary, int64(primaryMeta.offset), compressedPrimaryBuf) - var err error - primaryBuf := make([]byte, 0) - primaryBuf, err = zstd.Decompress(primaryBuf[:0], compressedPrimaryBuf) - if err != nil { - return nil, 0, fmt.Errorf("cannot decompress index block: %w", err) - } - bm := make([]blockMetadata, 0) - bm, err = unmarshalBlockMetadata(bm, primaryBuf) - if err != nil { - return nil, 0, fmt.Errorf("cannot unmarshal index block: %w", err) - } - for i := range bm { - if bm[i].seriesID == seriesID && bm[i].timestamps.max >= timestamp && bm[i].timestamps.min <= timestamp { - timestamps := make([]int64, 0) - timestamps = mustReadTimestampsFrom(timestamps, &bm[i].timestamps, int(bm[i].count), p.timestamps) - for j, ts := range timestamps { - if timestamp == ts { - elementIDs := make([]string, 0) - elementIDs = mustReadElementIDsFrom(elementIDs, &bm[i].elementIDs, int(bm[i].count), p.elementIDs) - tfs := make([]*tagFamily, 0) - for k := range tagProjection { - name := tagProjection[k].Family - block, ok := bm[i].tagFamilies[name] - if !ok { - tfs = append(tfs, &tagFamily{name: name, tags: make([]tag, len(tagProjection[k].Names))}) - continue - } - decoder := &encoding.BytesBlockDecoder{} - tf := unmarshalTagFamily(decoder, name, block, tagProjection[k].Names, p.tagFamilyMetadata[name], p.tagFamilies[name], len(timestamps)) - tfs = append(tfs, tf) - } - - return &element{ - timestamp: timestamps[j], - elementID: elementIDs[j], - tagFamilies: tfs, - index: j, - }, len(timestamps), nil - } - if ts > timestamp { - break - } - } - } - } - } - return nil, 0, errors.New("element not found") -} - -func unmarshalTagFamily(decoder *encoding.BytesBlockDecoder, name string, - tagFamilyMetadataBlock *dataBlock, tagProjection []string, metaReader, valueReader fs.Reader, count int, -) *tagFamily { - if len(tagProjection) < 1 { - return &tagFamily{} - } - bb := bigValuePool.Generate() - bb.Buf = bytes.ResizeExact(bb.Buf, int(tagFamilyMetadataBlock.size)) - fs.MustReadData(metaReader, int64(tagFamilyMetadataBlock.offset), bb.Buf) - tfm := generateTagFamilyMetadata() - defer releaseTagFamilyMetadata(tfm) - err := tfm.unmarshal(bb.Buf) - if err != nil { - logger.Panicf("%s: cannot unmarshal tagFamilyMetadata: %v", metaReader.Path(), err) - } - bigValuePool.Release(bb) - tf := tagFamily{} - tf.name = name - tf.tags = tf.resizeTags(len(tagProjection)) - - for j := range tagProjection { - for i := range tfm.tagMetadata { - if tagProjection[j] == tfm.tagMetadata[i].name { - tf.tags[j].mustReadValues(decoder, valueReader, tfm.tagMetadata[i], uint64(count)) - break - } - } - } - return &tf -} - func openMemPart(mp *memPart) *part { var p part p.partMetadata = mp.partMetadata diff --git a/banyand/stream/query.go b/banyand/stream/query.go index f9476d55..2edff01c 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -24,21 +24,24 @@ import ( "fmt" "sort" - "go.uber.org/multierr" - "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/index" itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) +type seriesTimestampMap map[common.SeriesID][]int64 + type queryOptions struct { - elementRefMap map[common.SeriesID][]int64 + filteredRefMap seriesTimestampMap pbv1.StreamQueryOptions minTimestamp int64 maxTimestamp int64 @@ -140,16 +143,17 @@ func strArrTagValue(values []string) *modelv1.TagValue { } type queryResult struct { - entityMap map[string]int - sidToIndex map[common.SeriesID]int - tagNameIndex map[string]partition.TagLocator - schema *databasev1.Stream - data []*blockCursor - snapshots []*snapshot - seriesList pbv1.SeriesList - loaded bool - orderByTS bool - ascTS bool + entityMap map[string]int + sidToIndex map[common.SeriesID]int + tagNameIndex map[string]partition.TagLocator + schema *databasev1.Stream + elementRefList []*elementRef + data []*blockCursor + snapshots []*snapshot + seriesList pbv1.SeriesList + loaded bool + orderByTS bool + asc bool } func (qr *queryResult) Pull() *pbv1.StreamResult { @@ -231,9 +235,13 @@ func (qr *queryResult) Pull() *pbv1.StreamResult { qr.loaded = true heap.Init(qr) } + if len(qr.data) == 0 { return nil } + if !qr.orderByTS { + return qr.mergeByTagValue() + } if len(qr.data) == 1 { r := &pbv1.StreamResult{} bc := qr.data[0] @@ -241,7 +249,7 @@ func (qr *queryResult) Pull() *pbv1.StreamResult { qr.data = qr.data[:0] return r } - return qr.merge() + return qr.mergeByTimestamp() } func (qr *queryResult) Release() { @@ -261,17 +269,13 @@ func (qr queryResult) Len() int { } func (qr queryResult) Less(i, j int) bool { - leftTS := qr.data[i].timestamps[qr.data[i].idx] - rightTS := qr.data[j].timestamps[qr.data[j].idx] - if qr.orderByTS { - if qr.ascTS { - return leftTS < rightTS - } - return leftTS > rightTS + leftIdx, rightIdx := qr.data[i].idx, qr.data[j].idx + leftTS := qr.data[i].timestamps[leftIdx] + rightTS := qr.data[j].timestamps[rightIdx] + if qr.asc { + return leftTS < rightTS } - leftSIDIndex := qr.sidToIndex[qr.data[i].bm.seriesID] - rightSIDIndex := qr.sidToIndex[qr.data[j].bm.seriesID] - return leftSIDIndex < rightSIDIndex + return leftTS > rightTS } func (qr queryResult) Swap(i, j int) { @@ -291,10 +295,59 @@ func (qr *queryResult) Pop() interface{} { } func (qr *queryResult) orderByTimestampDesc() bool { - return qr.orderByTS && !qr.ascTS + return qr.orderByTS && !qr.asc } -func (qr *queryResult) merge() *pbv1.StreamResult { +func (qr *queryResult) mergeByTagValue() *pbv1.StreamResult { + tmp := &pbv1.StreamResult{} + prevIdx := 0 + elementRefToIdx := make(map[elementRef]int) + for _, data := range qr.data { + data.copyAllTo(tmp, qr.orderByTimestampDesc()) + var idx int + for idx = prevIdx; idx < len(tmp.Timestamps); idx++ { + sid, ts := tmp.SIDs[idx], tmp.Timestamps[idx] + er := elementRef{ + seriesID: sid, + timestamp: ts, + } + elementRefToIdx[er] = idx + } + prevIdx = idx + } + + r := &pbv1.StreamResult{ + TagFamilies: []pbv1.TagFamily{}, + } + for _, tagFamily := range tmp.TagFamilies { + tf := pbv1.TagFamily{ + Name: tagFamily.Name, + Tags: []pbv1.Tag{}, + } + for _, tag := range tagFamily.Tags { + t := pbv1.Tag{ + Name: tag.Name, + Values: []*modelv1.TagValue{}, + } + tf.Tags = append(tf.Tags, t) + } + r.TagFamilies = append(r.TagFamilies, tf) + } + for _, er := range qr.elementRefList { + idx := elementRefToIdx[*er] + r.Timestamps = append(r.Timestamps, tmp.Timestamps[idx]) + r.ElementIDs = append(r.ElementIDs, tmp.ElementIDs[idx]) + for i := 0; i < len(r.TagFamilies); i++ { + for j := 0; j < len(r.TagFamilies[i].Tags); j++ { + r.TagFamilies[i].Tags[j].Values = append(r.TagFamilies[i].Tags[j].Values, tmp.TagFamilies[i].Tags[j].Values[idx]) + } + } + } + qr.data = qr.data[:0] + return r +} + +func (qr *queryResult) mergeByTimestamp() *pbv1.StreamResult { step := 1 if qr.orderByTimestampDesc() { step = -1 @@ -330,40 +383,7 @@ func (qr *queryResult) merge() *pbv1.StreamResult { return result } -func (s *stream) genIndex(tagProj []pbv1.TagProjection, seriesList pbv1.SeriesList) (map[string]int, map[string]*databasev1.TagSpec, - map[string]partition.TagLocator, map[common.SeriesID]int, -) { - entityMap := make(map[string]int) - for idx, entity := range s.schema.GetEntity().GetTagNames() { - entityMap[entity] = idx + 1 - } - tagSpecIndex := make(map[string]*databasev1.TagSpec) - for _, tagFamilySpec := range s.schema.GetTagFamilies() { - for _, tagSpec := range tagFamilySpec.Tags { - tagSpecIndex[tagSpec.GetName()] = tagSpec - } - } - tagProjIndex := make(map[string]partition.TagLocator) - for i, tagFamilyProj := range tagProj { - for j, tagProj := range tagFamilyProj.Names { - if entityMap[tagProj] == 0 { - continue - } - tagProjIndex[tagProj] = partition.TagLocator{ - FamilyOffset: i, - TagOffset: j, - } - } - } - sidToIndex := make(map[common.SeriesID]int) - for idx, series := range seriesList { - sidToIndex[series.ID] = idx - } - - return entityMap, tagSpecIndex, tagProjIndex, sidToIndex -} - -func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.StreamQueryResult, error) { +func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (sqr pbv1.StreamQueryResult, err error) { if sqo.TimeRange == nil || len(sqo.Entities) < 1 { return nil, errors.New("invalid query options: timeRange and series are required") } @@ -373,7 +393,7 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S db := s.databaseSupplier.SupplyTSDB() var result queryResult if db == nil { - return &result, nil + return sqr, nil } tsdb := db.(storage.TSDB[*tsTable, option]) tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange) @@ -382,6 +402,7 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S tabWrappers[i].DecRef() } }() + series := make([]*pbv1.Series, len(sqo.Entities)) for i := range sqo.Entities { series[i] = &pbv1.Series{ @@ -389,24 +410,33 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S EntityValues: sqo.Entities[i], } } - sl, err := tsdb.Lookup(ctx, series) + seriesList, err := tsdb.Lookup(ctx, series) if err != nil { return nil, err } + if len(seriesList) == 0 { + return sqr, nil + } - if len(sl) < 1 { - return &result, nil + filteredRefMap, err := indexSearch(ctx, sqo, tabWrappers, seriesList) + if err != nil { + return nil, err } - var sids []common.SeriesID - for i := range sl { - sids = append(sids, sl[i].ID) + elementRefList, sortedRefMap, err := indexSort(s, sqo, tabWrappers, seriesList, filteredRefMap) + if err != nil { + return nil, err } - var parts []*part + if sortedRefMap != nil { + filteredRefMap = sortedRefMap + } + qo := queryOptions{ StreamQueryOptions: sqo, minTimestamp: sqo.TimeRange.Start.UnixNano(), maxTimestamp: sqo.TimeRange.End.UnixNano(), + filteredRefMap: filteredRefMap, } + var parts []*part var n int for i := range tabWrappers { s := tabWrappers[i].Table().currentSnapshot() @@ -425,6 +455,16 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S // TODO: cache tstIter var ti tstIter defer ti.reset() + var sids []common.SeriesID + for i := 0; i < len(seriesList); i++ { + sid := seriesList[i].ID + if filteredRefMap != nil && filteredRefMap[sid] == nil { + seriesList = append(seriesList[:i], seriesList[i+1:]...) + i-- + continue + } + sids = append(sids, sid) + } originalSids := make([]common.SeriesID, len(sids)) copy(originalSids, sids) sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) @@ -442,12 +482,12 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S return nil, fmt.Errorf("cannot iterate tstIter: %w", ti.Error()) } - entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, sl) + entityMap, sidToIndex := s.genIndex(seriesList) result.entityMap = entityMap result.sidToIndex = sidToIndex result.tagNameIndex = make(map[string]partition.TagLocator) result.schema = s.schema - result.seriesList = sl + result.seriesList = seriesList for i, si := range originalSids { result.sidToIndex[si] = i } @@ -459,224 +499,122 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S } } } - result.orderByTS = true if sqo.Order == nil { - result.ascTS = true + result.orderByTS = true + result.asc = true return &result, nil } + if sqo.Order.Index == nil { + result.orderByTS = true + } else { + result.elementRefList = elementRefList + } if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { - result.ascTS = true + result.asc = true } return &result, nil } -func (s *stream) Sort(ctx context.Context, sqo pbv1.StreamQueryOptions) (ssr pbv1.StreamSortResult, err error) { - if sqo.TimeRange == nil || len(sqo.Entities) < 1 { - return nil, errors.New("invalid query options: timeRange and series are required") - } - if len(sqo.TagProjection) == 0 { - return nil, errors.New("invalid query options: tagProjection is required") - } - db := s.databaseSupplier.SupplyTSDB() - if db == nil { - return ssr, nil - } - tsdb := db.(storage.TSDB[*tsTable, option]) - tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange) - defer func() { - for i := range tabWrappers { - tabWrappers[i].DecRef() - } - }() - - series := make([]*pbv1.Series, len(sqo.Entities)) - for i := range sqo.Entities { - series[i] = &pbv1.Series{ - Subject: sqo.Name, - EntityValues: sqo.Entities[i], - } - } - seriesList, err := tsdb.Lookup(ctx, series) - if err != nil { - return nil, err - } - if len(seriesList) == 0 { - return ssr, nil - } - - iters, err := s.buildSeriesByIndex(tabWrappers, seriesList, sqo) - if err != nil { - return nil, err - } - if len(iters) == 0 { - return ssr, nil - } - - it := newItemIter(iters, sqo.Order.Sort) - defer func() { - err = multierr.Append(err, it.Close()) - }() - - ces := newColumnElements() - for it.Next() { - nextItem := it.Val() - e := nextItem.element - ces.BuildFromElement(e, sqo.TagProjection) - if len(ces.timestamp) >= sqo.MaxElementSize { - break - } - } - return ces, err -} - -// newItemIter returns a ItemIterator which mergers several tsdb.Iterator by input sorting order. -func newItemIter(iters []*searcherIterator, s modelv1.Sort) itersort.Iterator[item] { - var ii []itersort.Iterator[item] - for _, iter := range iters { - ii = append(ii, iter) - } - if s == modelv1.Sort_SORT_DESC { - return itersort.NewItemIter[item](ii, true) - } - return itersort.NewItemIter[item](ii, false) -} - -func (s *stream) Filter(ctx context.Context, sqo pbv1.StreamQueryOptions) (sqr pbv1.StreamQueryResult, err error) { - if sqo.TimeRange == nil || len(sqo.Entities) < 1 { - return nil, errors.New("invalid query options: timeRange and series are required") - } - if len(sqo.TagProjection) == 0 { - return nil, errors.New("invalid query options: tagProjection is required") - } - db := s.databaseSupplier.SupplyTSDB() - var result queryResult - if db == nil { - return sqr, nil - } - tsdb := db.(storage.TSDB[*tsTable, option]) - tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange) - defer func() { - for i := range tabWrappers { - tabWrappers[i].DecRef() - } - }() - - series := make([]*pbv1.Series, len(sqo.Entities)) - for i := range sqo.Entities { - series[i] = &pbv1.Series{ - Subject: sqo.Name, - EntityValues: sqo.Entities[i], - } - } - seriesList, err := tsdb.Lookup(ctx, series) - if err != nil { - return nil, err +func indexSearch(ctx context.Context, sqo pbv1.StreamQueryOptions, + tabWrappers []storage.TSTableWrapper[*tsTable], seriesList pbv1.SeriesList, +) (map[common.SeriesID][]int64, error) { + if sqo.Filter == nil || sqo.Filter == logical.ENode { + return nil, nil } - if len(seriesList) == 0 { - return sqr, nil - } - - var elementRefList []elementRef + var filteredRefList []elementRef for _, tw := range tabWrappers { index := tw.Table().Index() erl, err := index.Search(ctx, seriesList, sqo.Filter, sqo.TimeRange, sqo.Order) if err != nil { return nil, err } - elementRefList = append(elementRefList, erl...) - if len(elementRefList) > sqo.MaxElementSize { - elementRefList = elementRefList[:sqo.MaxElementSize] + filteredRefList = append(filteredRefList, erl...) + if (sqo.Order == nil || sqo.Order.Index == nil) && len(filteredRefList) > sqo.MaxElementSize { + filteredRefList = filteredRefList[:sqo.MaxElementSize] break } } - var elementRefMap map[common.SeriesID][]int64 - if len(elementRefList) != 0 { - elementRefMap = make(map[common.SeriesID][]int64) - for _, ref := range elementRefList { - if _, ok := elementRefMap[ref.seriesID]; !ok { - elementRefMap[ref.seriesID] = []int64{ref.timestamp} + filteredRefMap := make(map[common.SeriesID][]int64) + if len(filteredRefList) != 0 { + for _, ref := range filteredRefList { + if _, ok := filteredRefMap[ref.seriesID]; !ok { + filteredRefMap[ref.seriesID] = []int64{ref.timestamp} } else { - elementRefMap[ref.seriesID] = append(elementRefMap[ref.seriesID], ref.timestamp) + filteredRefMap[ref.seriesID] = append(filteredRefMap[ref.seriesID], ref.timestamp) } } } - qo := queryOptions{ - StreamQueryOptions: sqo, - minTimestamp: sqo.TimeRange.Start.UnixNano(), - maxTimestamp: sqo.TimeRange.End.UnixNano(), - elementRefMap: elementRefMap, + return filteredRefMap, nil +} + +func indexSort(s *stream, sqo pbv1.StreamQueryOptions, tabWrappers []storage.TSTableWrapper[*tsTable], + seriesList pbv1.SeriesList, filteredRefMap map[common.SeriesID][]int64, +) ([]*elementRef, map[common.SeriesID][]int64, error) { + if sqo.Order == nil || sqo.Order.Index == nil { + return nil, nil, nil } + iters, err := s.buildItersByIndex(tabWrappers, seriesList, sqo) + if err != nil { + return nil, nil, err + } + desc := sqo.Order != nil && sqo.Order.Index == nil && sqo.Order.Sort == modelv1.Sort_SORT_DESC + itemIter := itersort.NewItemIter[*index.ItemRef](iters, desc) - var parts []*part - var n int - for i := range tabWrappers { - s := tabWrappers[i].Table().currentSnapshot() - if s == nil { - continue + var elementRefList []*elementRef + sortedRefMap := make(map[common.SeriesID][]int64) + for { + if !itemIter.Next() { + break } - parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp) - if n < 1 { - s.decRef() + val := itemIter.Val() + ts, sid := val.DocID, val.SeriesID + if filteredRefMap != nil && (filteredRefMap[sid] == nil || timestamp.Find(filteredRefMap[sid], int64(ts)) == -1) { continue } - result.snapshots = append(result.snapshots, s) - } - bma := generateBlockMetadataArray() - defer releaseBlockMetadataArray(bma) - // TODO: cache tstIter - var ti tstIter - defer ti.reset() - var sids []common.SeriesID - for i := 0; i < len(seriesList); i++ { - sid := seriesList[i].ID - if _, ok := elementRefMap[sid]; !ok { - seriesList = append(seriesList[:i], seriesList[i+1:]...) - i-- - continue + if _, ok := sortedRefMap[sid]; !ok { + sortedRefMap[sid] = []int64{int64(ts)} + } else { + sortedRefMap[sid] = append(sortedRefMap[sid], int64(ts)) + } + elementRefList = append(elementRefList, &elementRef{timestamp: int64(ts), seriesID: sid}) + if len(elementRefList) >= sqo.MaxElementSize { + break } - sids = append(sids, sid) - } - originalSids := make([]common.SeriesID, len(sids)) - copy(originalSids, sids) - sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) - ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) - if ti.Error() != nil { - return nil, fmt.Errorf("cannot init tstIter: %w", ti.Error()) - } - for ti.nextBlock() { - bc := generateBlockCursor() - p := ti.piHeap[0] - bc.init(p.p, p.curBlock, qo) - result.data = append(result.data, bc) - } - if ti.Error() != nil { - return nil, fmt.Errorf("cannot iterate tstIter: %w", ti.Error()) } + return elementRefList, sortedRefMap, nil +} - entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, seriesList) - result.entityMap = entityMap - result.sidToIndex = sidToIndex - result.tagNameIndex = make(map[string]partition.TagLocator) - result.schema = s.schema - result.seriesList = seriesList - for i, si := range originalSids { - result.sidToIndex[si] = i - } - for i, tagFamilySpec := range s.schema.GetTagFamilies() { - for j, tagSpec := range tagFamilySpec.GetTags() { - result.tagNameIndex[tagSpec.GetName()] = partition.TagLocator{ - FamilyOffset: i, - TagOffset: j, - } +func (s *stream) buildItersByIndex(tableWrappers []storage.TSTableWrapper[*tsTable], + seriesList pbv1.SeriesList, sqo pbv1.StreamQueryOptions, +) (iters []itersort.Iterator[*index.ItemRef], err error) { + indexRuleForSorting := sqo.Order.Index + if len(indexRuleForSorting.Tags) != 1 { + return nil, fmt.Errorf("only support one tag for sorting, but got %d", len(indexRuleForSorting.Tags)) + } + sids := seriesList.IDs() + for _, tw := range tableWrappers { + var iter index.FieldIterator[*index.ItemRef] + fieldKey := index.FieldKey{ + IndexRuleID: indexRuleForSorting.GetMetadata().GetId(), + Analyzer: indexRuleForSorting.GetAnalyzer(), + } + iter, err = tw.Table().Index().Sort(sids, fieldKey, sqo.Order.Sort, sqo.MaxElementSize) + if err != nil { + return nil, err } + iters = append(iters, iter) } - result.orderByTS = true - if sqo.Order == nil { - result.ascTS = true - return &result, nil + return +} + +func (s *stream) genIndex(seriesList pbv1.SeriesList) (map[string]int, map[common.SeriesID]int) { + entityMap := make(map[string]int) + for idx, entity := range s.schema.GetEntity().GetTagNames() { + entityMap[entity] = idx + 1 } - if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { - result.ascTS = true + sidToIndex := make(map[common.SeriesID]int) + for idx, series := range seriesList { + sidToIndex[series.ID] = idx } - return &result, nil + return entityMap, sidToIndex } diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go index fa3e1c61..4bd92c39 100644 --- a/banyand/stream/query_test.go +++ b/banyand/stream/query_test.go @@ -58,7 +58,7 @@ func TestQueryResult(t *testing.T) { minTimestamp: 1, maxTimestamp: 1, want: []pbv1.StreamResult{{ - SID: 1, + SIDs: []common.SeriesID{1}, Timestamps: []int64{1}, ElementIDs: []string{"11"}, TagFamilies: []pbv1.TagFamily{ @@ -75,12 +75,12 @@ func TestQueryResult(t *testing.T) { }}, }, }, { - SID: 3, + SIDs: []common.SeriesID{3, 3}, Timestamps: []int64{1, 1}, ElementIDs: []string{"31", "31"}, TagFamilies: nil, }, { - SID: 2, + SIDs: []common.SeriesID{2, 2}, Timestamps: []int64{1, 1}, ElementIDs: []string{"21", "21"}, TagFamilies: []pbv1.TagFamily{ @@ -90,7 +90,7 @@ func TestQueryResult(t *testing.T) { }}, }, }, { - SID: 1, + SIDs: []common.SeriesID{1}, Timestamps: []int64{1}, ElementIDs: []string{"11"}, TagFamilies: []pbv1.TagFamily{ @@ -115,7 +115,7 @@ func TestQueryResult(t *testing.T) { minTimestamp: 1, maxTimestamp: 2, want: []pbv1.StreamResult{{ - SID: 1, + SIDs: []common.SeriesID{1}, Timestamps: []int64{2}, ElementIDs: []string{"12"}, TagFamilies: []pbv1.TagFamily{ @@ -132,7 +132,7 @@ func TestQueryResult(t *testing.T) { }}, }, }, { - SID: 2, + SIDs: []common.SeriesID{2}, Timestamps: []int64{2}, ElementIDs: []string{"22"}, TagFamilies: []pbv1.TagFamily{ @@ -142,12 +142,12 @@ func TestQueryResult(t *testing.T) { }}, }, }, { - SID: 3, + SIDs: []common.SeriesID{3}, Timestamps: []int64{2}, ElementIDs: []string{"32"}, TagFamilies: nil, }, { - SID: 1, + SIDs: []common.SeriesID{1}, Timestamps: []int64{1}, ElementIDs: []string{"11"}, TagFamilies: []pbv1.TagFamily{ @@ -164,12 +164,12 @@ func TestQueryResult(t *testing.T) { }}, }, }, { - SID: 3, + SIDs: []common.SeriesID{3}, Timestamps: []int64{1}, ElementIDs: []string{"31"}, TagFamilies: nil, }, { - SID: 2, + SIDs: []common.SeriesID{2}, Timestamps: []int64{1}, ElementIDs: []string{"21"}, TagFamilies: []pbv1.TagFamily{ @@ -188,7 +188,7 @@ func TestQueryResult(t *testing.T) { minTimestamp: 1, maxTimestamp: 2, want: []pbv1.StreamResult{{ - SID: 1, + SIDs: []common.SeriesID{1}, Timestamps: []int64{1}, ElementIDs: []string{"11"}, TagFamilies: []pbv1.TagFamily{ @@ -205,12 +205,12 @@ func TestQueryResult(t *testing.T) { }}, }, }, { - SID: 3, + SIDs: []common.SeriesID{3}, Timestamps: []int64{1}, ElementIDs: []string{"31"}, TagFamilies: nil, }, { - SID: 2, + SIDs: []common.SeriesID{2, 2}, Timestamps: []int64{1, 2}, ElementIDs: []string{"21", "22"}, TagFamilies: []pbv1.TagFamily{ @@ -220,7 +220,7 @@ func TestQueryResult(t *testing.T) { }}, }, }, { - SID: 1, + SIDs: []common.SeriesID{1}, Timestamps: []int64{2}, ElementIDs: []string{"12"}, TagFamilies: []pbv1.TagFamily{ @@ -237,94 +237,12 @@ func TestQueryResult(t *testing.T) { }}, }, }, { - SID: 3, + SIDs: []common.SeriesID{3}, Timestamps: []int64{2}, ElementIDs: []string{"32"}, TagFamilies: nil, }}, }, - { - name: "Test with multiple parts with duplicated data order by Series", - esList: []*elements{esTS1, esTS1}, - sids: []common.SeriesID{1, 2, 3}, - orderBySeries: true, - minTimestamp: 1, - maxTimestamp: 1, - want: []pbv1.StreamResult{{ - SID: 1, - Timestamps: []int64{1, 1}, - ElementIDs: []string{"11", "11"}, - TagFamilies: []pbv1.TagFamily{ - {Name: "arrTag", Tags: []pbv1.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"}), strArrTagValue([]string{"value1", "value2"})}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30}), int64ArrTagValue([]int64{25, 30})}}, - }}, - {Name: "binaryTag", Tags: []pbv1.Tag{ - {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText), binaryDataTagValue(longText)}}, - }}, - {Name: "singleTag", Tags: []pbv1.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1"), strTagValue("value1")}}, - {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10), int64TagValue(10)}}, - }}, - }, - }, { - SID: 2, - Timestamps: []int64{1, 1}, - ElementIDs: []string{"21", "21"}, - TagFamilies: []pbv1.TagFamily{ - {Name: "singleTag", Tags: []pbv1.Tag{ - {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag1"), strTagValue("tag1")}}, - {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag2"), strTagValue("tag2")}}, - }}, - }, - }, { - SID: 3, - Timestamps: []int64{1, 1}, - ElementIDs: []string{"31", "31"}, - TagFamilies: nil, - }}, - }, - { - name: "Test with multiple parts with multiple data order by Series", - esList: []*elements{esTS1, esTS2}, - sids: []common.SeriesID{2, 1, 3}, - orderBySeries: true, - minTimestamp: 1, - maxTimestamp: 2, - want: []pbv1.StreamResult{{ - SID: 2, - Timestamps: []int64{2, 1}, - ElementIDs: []string{"22", "21"}, - TagFamilies: []pbv1.TagFamily{ - {Name: "singleTag", Tags: []pbv1.Tag{ - {Name: "strTag1", Values: []*modelv1.TagValue{strTagValue("tag3"), strTagValue("tag1")}}, - {Name: "strTag2", Values: []*modelv1.TagValue{strTagValue("tag4"), strTagValue("tag2")}}, - }}, - }, - }, { - SID: 1, - Timestamps: []int64{1, 2}, - ElementIDs: []string{"11", "12"}, - TagFamilies: []pbv1.TagFamily{ - {Name: "arrTag", Tags: []pbv1.Tag{ - {Name: "strArrTag", Values: []*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"}), strArrTagValue([]string{"value5", "value6"})}}, - {Name: "intArrTag", Values: []*modelv1.TagValue{int64ArrTagValue([]int64{25, 30}), int64ArrTagValue([]int64{35, 40})}}, - }}, - {Name: "binaryTag", Tags: []pbv1.Tag{ - {Name: "binaryTag", Values: []*modelv1.TagValue{binaryDataTagValue(longText), binaryDataTagValue(longText)}}, - }}, - {Name: "singleTag", Tags: []pbv1.Tag{ - {Name: "strTag", Values: []*modelv1.TagValue{strTagValue("value1"), strTagValue("value3")}}, - {Name: "intTag", Values: []*modelv1.TagValue{int64TagValue(10), int64TagValue(30)}}, - }}, - }, - }, { - SID: 3, - Timestamps: []int64{2, 1}, - ElementIDs: []string{"32", "31"}, - TagFamilies: nil, - }}, - }, } bma := generateBlockMetadataArray() defer releaseBlockMetadataArray(bma) @@ -365,7 +283,7 @@ func TestQueryResult(t *testing.T) { } } else { result.orderByTS = true - result.ascTS = tt.ascTS + result.asc = tt.ascTS } var got []pbv1.StreamResult for { diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index 8c6f1b7a..7181ed36 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -61,8 +61,6 @@ type Stream interface { GetSchema() *databasev1.Stream GetIndexRules() []*databasev1.IndexRule Query(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamQueryResult, error) - Sort(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamSortResult, error) - Filter(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamQueryResult, error) } var _ Stream = (*stream)(nil) diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index dda0a57b..6fbad53a 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -34,7 +34,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" - pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/timestamp" "github.com/apache/skywalking-banyandb/pkg/watcher" @@ -284,25 +283,6 @@ func (tst *tsTable) mustAddElements(es *elements) { } } -func (tst *tsTable) getElement(seriesID common.SeriesID, timestamp int64, tagProjection []pbv1.TagProjection) (*element, int, error) { - s := tst.currentSnapshot() - if s == nil { - return nil, 0, fmt.Errorf("snapshot is absent, cannot find element with seriesID %d and timestamp %d", seriesID, timestamp) - } - defer s.decRef() - - for _, p := range s.parts { - if !p.p.containTimestamp(timestamp) { - continue - } - elem, count, err := p.p.getElement(seriesID, timestamp, tagProjection) - if err == nil { - return elem, count, nil - } - } - return nil, 0, fmt.Errorf("cannot find element with seriesID %d and timestamp %d", seriesID, timestamp) -} - type tstIter struct { err error parts []*part diff --git a/pkg/index/index.go b/pkg/index/index.go index 43883931..ff38d5d9 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -31,6 +31,7 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index/posting" + "github.com/apache/skywalking-banyandb/pkg/iter/sort" ) var errMalformed = errors.New("the data is malformed") @@ -163,10 +164,22 @@ func (r RangeOpts) Between(value []byte) int { return 0 } +// ItemRef represents a reference to an item. +type ItemRef struct { + Term []byte + SeriesID common.SeriesID + DocID uint64 +} + +// SortedField returns the value of the sorted field. +func (ir ItemRef) SortedField() []byte { + return ir.Term +} + // FieldIterator allows iterating over a field's posting values. -type FieldIterator interface { +type FieldIterator[T sort.Comparable] interface { Next() bool - Val() (uint64, common.SeriesID, []byte) + Val() T Close() error } @@ -179,8 +192,8 @@ func (i *dummyIterator) Next() bool { return false } -func (i *dummyIterator) Val() (uint64, common.SeriesID, []byte) { - return 0, 0, nil +func (i *dummyIterator) Val() *ItemRef { + return &ItemRef{} } func (i *dummyIterator) Close() error { @@ -211,8 +224,8 @@ type Writer interface { // FieldIterable allows building a FieldIterator. type FieldIterable interface { - Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int) (iter FieldIterator, err error) - Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, preLoadSize int) (FieldIterator, error) + Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int) (iter FieldIterator[*ItemRef], err error) + Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, preLoadSize int) (FieldIterator[*ItemRef], error) } // Searcher allows searching a field either by its key or by its key and term. diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index ceb9e739..b1d93d73 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -160,7 +160,7 @@ func (s *store) Write(fields []index.Field, docID uint64) error { return nil } -func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, preLoadSize int) (iter index.FieldIterator, err error) { +func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, preLoadSize int) (iter index.FieldIterator[*index.ItemRef], err error) { if termRange.Lower != nil && termRange.Upper != nil && bytes.Compare(termRange.Lower, termRange.Upper) > 0 { @@ -241,8 +241,7 @@ func (s *store) MatchTerms(field index.Field) (list posting.List, err error) { }() list = roaring.NewPostingList() for iter.Next() { - docID, _, _ := iter.Val() - list.Insert(docID) + list.Insert(iter.Val().DocID) } return list, err } @@ -275,8 +274,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List, }() list := roaring.NewPostingList() for iter.Next() { - docID, _, _ := iter.Val() - list.Insert(docID) + list.Insert(iter.Val().DocID) } return list, err } @@ -288,8 +286,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti } list = roaring.NewPostingList() for iter.Next() { - docID, _, _ := iter.Val() - list.Insert(docID) + list.Insert(iter.Val().DocID) } err = multierr.Append(err, iter.Close()) return @@ -456,8 +453,12 @@ func (bmi *blugeMatchIterator) Next() bool { return bmi.err == nil } -func (bmi *blugeMatchIterator) Val() (uint64, common.SeriesID, []byte) { - return bmi.docID, bmi.seriesID, bmi.term +func (bmi *blugeMatchIterator) Val() *index.ItemRef { + return &index.ItemRef{ + SeriesID: bmi.seriesID, + DocID: bmi.docID, + Term: bmi.term, + } } func (bmi *blugeMatchIterator) Close() error { diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go index 72291ab9..0447e255 100644 --- a/pkg/index/inverted/sort.go +++ b/pkg/index/inverted/sort.go @@ -31,7 +31,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index" ) -func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order modelv1.Sort, preLoadSize int) (iter index.FieldIterator, err error) { +func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order modelv1.Sort, preLoadSize int) (iter index.FieldIterator[*index.ItemRef], err error) { reader, err := s.writer.Reader() if err != nil { return nil, err @@ -127,7 +127,7 @@ func (si *sortIterator) next() bool { return false } -func (si *sortIterator) Val() (uint64, common.SeriesID, []byte) { +func (si *sortIterator) Val() *index.ItemRef { return si.current.Val() } diff --git a/pkg/index/inverted/sort_test.go b/pkg/index/inverted/sort_test.go index 6fe759aa..4c67b994 100644 --- a/pkg/index/inverted/sort_test.go +++ b/pkg/index/inverted/sort_test.go @@ -167,10 +167,10 @@ func TestStore_Sort(t *testing.T) { is.NotNil(iter) var got result for iter.Next() { - docID, _, term := iter.Val() - got.items = append(got.items, docID) - if term != nil { - got.terms = append(got.terms, term) + val := iter.Val() + got.items = append(got.items, val.DocID) + if val.Term != nil { + got.terms = append(got.terms, val.Term) } } for i := 0; i < 10; i++ { diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go index 944b2e4c..569acbf6 100644 --- a/pkg/index/testcases/duration.go +++ b/pkg/index/testcases/duration.go @@ -303,8 +303,7 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) { is.NotNil(iter) var got result for iter.Next() { - docID, _, _ := iter.Val() - got.items = append(got.items, docID) + got.items = append(got.items, iter.Val().DocID) } for i := 0; i < 10; i++ { is.False(iter.Next()) diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go index 3aaf08a5..200d52fc 100644 --- a/pkg/pb/v1/metadata.go +++ b/pkg/pb/v1/metadata.go @@ -115,14 +115,7 @@ type StreamResult struct { Timestamps []int64 ElementIDs []string TagFamilies []TagFamily - SID common.SeriesID -} - -// StreamColumnResult is the result of a stream sort or filter. -type StreamColumnResult struct { - TagFamilies [][]TagFamily - Timestamps []int64 - ElementIDs []string + SIDs []common.SeriesID } // TagProjection is the projection of a tag family and its tags. @@ -148,11 +141,6 @@ type StreamQueryResult interface { Release() } -// StreamSortResult is the result of a stream sort. -type StreamSortResult interface { - Pull() *StreamColumnResult -} - // OrderByType is the type of order by. type OrderByType int diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go index a5edf44c..f8234f7f 100644 --- a/pkg/query/executor/interface.go +++ b/pkg/query/executor/interface.go @@ -31,8 +31,6 @@ import ( // StreamExecutionContext allows retrieving data through the stream module. type StreamExecutionContext interface { Query(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamQueryResult, error) - Sort(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamSortResult, error) - Filter(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamQueryResult, error) } // StreamExecutionContextKey is the key of stream execution context in context.Context. diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go b/pkg/query/logical/stream/stream_plan_indexscan_local.go index 2e7a39fb..5a3032b0 100644 --- a/pkg/query/logical/stream/stream_plan_indexscan_local.go +++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go @@ -71,56 +71,20 @@ func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, erro } } ec := executor.FromStreamExecutionContext(ctx) - - if i.order != nil && i.order.Index != nil { - ssr, err := ec.Sort(ctx, pbv1.StreamQueryOptions{ - Name: i.metadata.GetName(), - TimeRange: &i.timeRange, - Entities: i.entities, - Filter: i.filter, - Order: orderBy, - TagProjection: i.projectionTags, - MaxElementSize: i.maxElementSize, - }) - if err != nil { - return nil, err - } - if ssr == nil { - return nil, nil - } - r := ssr.Pull() - return buildElementsFromColumnResult(r), nil - } - - if i.filter != nil && i.filter != logical.ENode { - result, err := ec.Filter(ctx, pbv1.StreamQueryOptions{ - Name: i.metadata.GetName(), - TimeRange: &i.timeRange, - Entities: i.entities, - Filter: i.filter, - Order: orderBy, - TagProjection: i.projectionTags, - MaxElementSize: i.maxElementSize, - }) - if err != nil { - return nil, err - } - if result == nil { - return nil, nil - } - return BuildElementsFromStreamResult(result), nil - } - result, err := ec.Query(ctx, pbv1.StreamQueryOptions{ - Name: i.metadata.GetName(), - TimeRange: &i.timeRange, - Entities: i.entities, - Filter: i.filter, - Order: orderBy, - TagProjection: i.projectionTags, + Name: i.metadata.GetName(), + TimeRange: &i.timeRange, + Entities: i.entities, + Filter: i.filter, + Order: orderBy, + TagProjection: i.projectionTags, + MaxElementSize: i.maxElementSize, }) if err != nil { - return nil, fmt.Errorf("failed to query stream: %w", err) + return nil, err + } + if result == nil { + return nil, nil } return BuildElementsFromStreamResult(result), nil } @@ -142,30 +106,6 @@ func (i *localIndexScan) Schema() logical.Schema { return i.schema.ProjTags(i.projectionTagRefs...) } -func buildElementsFromColumnResult(r *pbv1.StreamColumnResult) (elements []*streamv1.Element) { - for i := range r.Timestamps { - e := &streamv1.Element{ - Timestamp: timestamppb.New(time.Unix(0, r.Timestamps[i])), - ElementId: r.ElementIDs[i], - } - - for _, tf := range r.TagFamilies[i] { - tagFamily := &modelv1.TagFamily{ - Name: tf.Name, - } - e.TagFamilies = append(e.TagFamilies, tagFamily) - for _, t := range tf.Tags { - tagFamily.Tags = append(tagFamily.Tags, &modelv1.Tag{ - Key: t.Name, - Value: t.Values[0], - }) - } - } - elements = append(elements, e) - } - return -} - // BuildElementsFromStreamResult builds a slice of elements from the given stream query result. func BuildElementsFromStreamResult(result pbv1.StreamQueryResult) (elements []*streamv1.Element) { deduplication := make(map[string]struct{})