This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f37a17c Improve sorting performance of Stream (#459)
2f37a17c is described below

commit 2f37a17c873a91f654a2561c6673012ea4d42243
Author: Huang Youliang <butterbright0...@gmail.com>
AuthorDate: Mon Jun 17 10:23:34 2024 +0800

    Improve sorting performance of Stream (#459)
    
    * Import sorting performance of Stream
    
    ---------
    
    Co-authored-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                                         |   1 +
 banyand/internal/storage/index.go                  |   6 +-
 banyand/stream/benchmark_test.go                   |   6 +-
 banyand/stream/block.go                            |  20 +-
 banyand/stream/index.go                            |  38 +-
 banyand/stream/iter.go                             | 148 -------
 banyand/stream/iter_builder.go                     | 124 ------
 banyand/stream/part.go                             | 162 --------
 banyand/stream/query.go                            | 450 +++++++++------------
 banyand/stream/query_test.go                       | 114 +-----
 banyand/stream/stream.go                           |   2 -
 banyand/stream/tstable.go                          |  20 -
 pkg/index/index.go                                 |  25 +-
 pkg/index/inverted/inverted.go                     |  19 +-
 pkg/index/inverted/sort.go                         |   4 +-
 pkg/index/inverted/sort_test.go                    |   8 +-
 pkg/index/testcases/duration.go                    |   3 +-
 pkg/pb/v1/metadata.go                              |  14 +-
 pkg/query/executor/interface.go                    |   2 -
 .../logical/stream/stream_plan_indexscan_local.go  |  82 +---
 20 files changed, 301 insertions(+), 947 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1d0b47a2..7fbff6b3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,7 @@ Release Notes.
 ### Features
 
 - Check unregistered nodes in background.
+- Improve sorting performance of stream.
 
 ### Bugs
 
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index ed2b5ec5..4dcac9d5 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -216,11 +216,11 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, filter
 
        var sortedSeriesList pbv1.SeriesList
        for iter.Next() {
-               seriesID, _, _ := iter.Val()
-               if !pl.Contains(seriesID) {
+               docID := iter.Val().DocID
+               if !pl.Contains(docID) {
                        continue
                }
-               sortedSeriesList = appendSeriesList(sortedSeriesList, 
seriesList, common.SeriesID(seriesID))
+               sortedSeriesList = appendSeriesList(sortedSeriesList, 
seriesList, common.SeriesID(docID))
                if err != nil {
                        return nil, err
                }
diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go
index 4b0b4cbc..b0c711c4 100644
--- a/banyand/stream/benchmark_test.go
+++ b/banyand/stream/benchmark_test.go
@@ -348,8 +348,9 @@ func BenchmarkFilter(b *testing.B) {
                db := write(b, p, esList, docsList)
                s := generateStream(db)
                sqo := generateStreamQueryOptions(p, idx)
+               sqo.Order = nil
                b.Run("filter-"+p.scenario, func(b *testing.B) {
-                       res, err := s.Filter(context.TODO(), sqo)
+                       res, err := s.Query(context.TODO(), sqo)
                        require.NoError(b, err)
                        logicalstream.BuildElementsFromStreamResult(res)
                })
@@ -364,8 +365,9 @@ func BenchmarkSort(b *testing.B) {
                s := generateStream(db)
                sqo := generateStreamQueryOptions(p, idx)
                b.Run("sort-"+p.scenario, func(b *testing.B) {
-                       _, err := s.Sort(context.TODO(), sqo)
+                       res, err := s.Query(context.TODO(), sqo)
                        require.NoError(b, err)
+                       logicalstream.BuildElementsFromStreamResult(res)
                })
        }
 }
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 252b3947..71a2456c 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -410,7 +410,7 @@ var blockPool sync.Pool
 type blockCursor struct {
        p                  *part
        timestamps         []int64
-       expectedTimestamps []int64
+       filteredTimestamps []int64
        elementIDs         []string
        tagFamilies        []tagFamily
        tagValuesDecoder   encoding.BytesBlockDecoder
@@ -446,9 +446,9 @@ func (bc *blockCursor) init(p *part, bm *blockMetadata, 
opts queryOptions) {
        bc.minTimestamp = opts.minTimestamp
        bc.maxTimestamp = opts.maxTimestamp
        bc.tagProjection = opts.TagProjection
-       if opts.elementRefMap != nil {
-               seriesID := bc.bm.seriesID
-               bc.expectedTimestamps = opts.elementRefMap[seriesID]
+       seriesID := bc.bm.seriesID
+       if opts.filteredRefMap != nil {
+               bc.filteredTimestamps = opts.filteredRefMap[seriesID]
        }
 }
 
@@ -464,9 +464,11 @@ func (bc *blockCursor) copyAllTo(r *pbv1.StreamResult, 
desc bool) {
        if offset <= idx {
                return
        }
-       r.SID = bc.bm.seriesID
        r.Timestamps = append(r.Timestamps, bc.timestamps[idx:offset]...)
        r.ElementIDs = append(r.ElementIDs, bc.elementIDs[idx:offset]...)
+       for i := idx; i < offset; i++ {
+               r.SIDs = append(r.SIDs, bc.bm.seriesID)
+       }
        if len(r.TagFamilies) != len(bc.tagProjection) {
                for _, tp := range bc.tagProjection {
                        tf := pbv1.TagFamily{
@@ -497,9 +499,9 @@ func (bc *blockCursor) copyAllTo(r *pbv1.StreamResult, desc 
bool) {
 }
 
 func (bc *blockCursor) copyTo(r *pbv1.StreamResult) {
-       r.SID = bc.bm.seriesID
        r.Timestamps = append(r.Timestamps, bc.timestamps[bc.idx])
        r.ElementIDs = append(r.ElementIDs, bc.elementIDs[bc.idx])
+       r.SIDs = append(r.SIDs, bc.bm.seriesID)
        if len(r.TagFamilies) != len(bc.tagProjection) {
                for _, tp := range bc.tagProjection {
                        tf := pbv1.TagFamily{
@@ -550,8 +552,8 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
 
        idxList := make([]int, 0)
        var start, end int
-       if bc.expectedTimestamps != nil {
-               for _, ts := range bc.expectedTimestamps {
+       if bc.filteredTimestamps != nil {
+               for _, ts := range bc.filteredTimestamps {
                        idx := timestamp.Find(tmpBlock.timestamps, ts)
                        if idx == -1 {
                                continue
@@ -588,7 +590,7 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
                                        logger.Panicf("unexpected number of 
values for tags %q: got %d; want %d",
                                                
tmpBlock.tagFamilies[i].tags[blockIndex].name, 
len(tmpBlock.tagFamilies[i].tags[blockIndex].values), len(tmpBlock.timestamps))
                                }
-                               if bc.expectedTimestamps != nil {
+                               if bc.filteredTimestamps != nil {
                                        for _, idx := range idxList {
                                                t.values = append(t.values, 
tmpBlock.tagFamilies[i].tags[blockIndex].values[idx])
                                        }
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index e1a1d42a..2455834c 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -54,7 +54,7 @@ func newElementIndex(ctx context.Context, root string, 
flushTimeoutSeconds int64
        return ei, nil
 }
 
-func (e *elementIndex) Sort(sids []common.SeriesID, fieldKey index.FieldKey, 
order modelv1.Sort, preloadSize int) (index.FieldIterator, error) {
+func (e *elementIndex) Sort(sids []common.SeriesID, fieldKey index.FieldKey, 
order modelv1.Sort, preloadSize int) (index.FieldIterator[*index.ItemRef], 
error) {
        iter, err := e.store.Sort(sids, fieldKey, order, preloadSize)
        if err != nil {
                return nil, err
@@ -72,6 +72,7 @@ func (e *elementIndex) Search(_ context.Context, seriesList 
pbv1.SeriesList, fil
        timeRange *timestamp.TimeRange, order *pbv1.OrderBy,
 ) ([]elementRef, error) {
        pm := make(map[common.SeriesID][]uint64)
+       desc := order != nil && order.Index == nil && order.Sort == 
modelv1.Sort_SORT_DESC
        for _, series := range seriesList {
                pl, err := filter.Execute(func(_ databasev1.IndexRule_Type) 
(index.Searcher, error) {
                        return e.store, nil
@@ -86,7 +87,7 @@ func (e *elementIndex) Search(_ context.Context, seriesList 
pbv1.SeriesList, fil
                        continue
                }
                timestamps := pl.ToSlice()
-               if order != nil && order.Index == nil && order.Sort == 
modelv1.Sort_SORT_DESC {
+               if desc {
                        sort.Slice(timestamps, func(i, j int) bool {
                                return timestamps[i] > timestamps[j]
                        })
@@ -102,7 +103,7 @@ func (e *elementIndex) Search(_ context.Context, seriesList 
pbv1.SeriesList, fil
                        pm[series.ID] = timestamps[start : end+1]
                }
        }
-       return merge(pm), nil
+       return merge(pm, !desc), nil
 }
 
 func (e *elementIndex) Close() error {
@@ -119,34 +120,43 @@ type indexedElementRef struct {
        elemIdx int
 }
 
-type priorityQueue []*indexedElementRef
+type priorityQueue struct {
+       elementRefList []*indexedElementRef
+       asc            bool
+}
 
-func (pq priorityQueue) Len() int { return len(pq) }
+func (pq priorityQueue) Len() int { return len(pq.elementRefList) }
 
 func (pq priorityQueue) Less(i, j int) bool {
-       return pq[i].timestamp < pq[j].timestamp
+       if pq.asc {
+               return pq.elementRefList[i].timestamp < 
pq.elementRefList[j].timestamp
+       }
+       return pq.elementRefList[i].timestamp > pq.elementRefList[j].timestamp
 }
 
 func (pq priorityQueue) Swap(i, j int) {
-       pq[i], pq[j] = pq[j], pq[i]
+       pq.elementRefList[i], pq.elementRefList[j] = pq.elementRefList[j], 
pq.elementRefList[i]
 }
 
 func (pq *priorityQueue) Push(x interface{}) {
        item := x.(*indexedElementRef)
-       *pq = append(*pq, item)
+       pq.elementRefList = append(pq.elementRefList, item)
 }
 
 func (pq *priorityQueue) Pop() interface{} {
-       old := *pq
-       n := len(old)
-       item := old[n-1]
-       *pq = old[0 : n-1]
+       n := len(pq.elementRefList)
+       item := pq.elementRefList[n-1]
+       pq.elementRefList = pq.elementRefList[0 : n-1]
        return item
 }
 
-func merge(postingMap map[common.SeriesID][]uint64) []elementRef {
+func merge(postingMap map[common.SeriesID][]uint64, asc bool) []elementRef {
        var result []elementRef
-       pq := make(priorityQueue, 0)
+       erl := make([]*indexedElementRef, 0)
+       pq := priorityQueue{
+               elementRefList: erl,
+               asc:            asc,
+       }
        heap.Init(&pq)
 
        for seriesID, timestamps := range postingMap {
diff --git a/banyand/stream/iter.go b/banyand/stream/iter.go
deleted file mode 100644
index 39412ca2..00000000
--- a/banyand/stream/iter.go
+++ /dev/null
@@ -1,148 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package stream
-
-import (
-       "errors"
-       "io"
-
-       "go.uber.org/multierr"
-
-       "github.com/apache/skywalking-banyandb/api/common"
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/pkg/index"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/partition"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-)
-
-type searcherIterator struct {
-       fieldIterator     index.FieldIterator
-       err               error
-       tagSpecIndex      map[string]*databasev1.TagSpec
-       timeFilter        filterFn
-       table             *tsTable
-       l                 *logger.Logger
-       indexFilter       map[common.SeriesID]filterFn
-       tagProjIndex      map[string]partition.TagLocator
-       sidToIndex        map[common.SeriesID]int
-       entityMap         map[string]int
-       tagProjection     []pbv1.TagProjection
-       seriesList        pbv1.SeriesList
-       currItem          item
-       sortedTagLocation tagLocation
-}
-
-func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, 
table *tsTable,
-       indexFilter map[common.SeriesID]filterFn, timeFilter filterFn, 
tagProjection []pbv1.TagProjection,
-       sortedTagLocation tagLocation, tagSpecIndex 
map[string]*databasev1.TagSpec,
-       tagProjIndex map[string]partition.TagLocator, sidToIndex 
map[common.SeriesID]int,
-       seriesList pbv1.SeriesList, entityMap map[string]int,
-) *searcherIterator {
-       return &searcherIterator{
-               fieldIterator:     fieldIterator,
-               table:             table,
-               indexFilter:       indexFilter,
-               timeFilter:        timeFilter,
-               l:                 l,
-               tagProjection:     tagProjection,
-               sortedTagLocation: sortedTagLocation,
-               tagSpecIndex:      tagSpecIndex,
-               tagProjIndex:      tagProjIndex,
-               sidToIndex:        sidToIndex,
-               seriesList:        seriesList,
-               entityMap:         entityMap,
-       }
-}
-
-func (s *searcherIterator) Next() bool {
-       if s.err != nil {
-               return false
-       }
-       if !s.fieldIterator.Next() {
-               s.err = io.EOF
-               return false
-       }
-       itemID, seriesID, _ := s.fieldIterator.Val()
-       if !s.timeFilter(itemID) {
-               return s.Next()
-       }
-       if s.indexFilter != nil {
-               if f, ok := s.indexFilter[seriesID]; ok && !f(itemID) {
-                       return s.Next()
-               }
-       }
-       e, c, err := s.table.getElement(seriesID, int64(itemID), 
s.tagProjection)
-       if err != nil {
-               s.err = err
-               return false
-       }
-       if len(s.tagProjIndex) != 0 {
-               for entity, offset := range s.tagProjIndex {
-                       tagSpec := s.tagSpecIndex[entity]
-                       if tagSpec.IndexedOnly {
-                               continue
-                       }
-                       index, ok := s.sidToIndex[seriesID]
-                       if !ok {
-                               continue
-                       }
-                       series := s.seriesList[index]
-                       entityPos := s.entityMap[entity] - 1
-                       
e.tagFamilies[offset.FamilyOffset].tags[offset.TagOffset] = tag{
-                               name:      entity,
-                               values:    mustEncodeTagValue(entity, 
tagSpec.GetType(), series.EntityValues[entityPos], c),
-                               valueType: 
pbv1.MustTagValueToValueType(series.EntityValues[entityPos]),
-                       }
-               }
-       }
-       sv, err := s.sortedTagLocation.getTagValue(e)
-       if err != nil {
-               s.err = err
-               return false
-       }
-       s.currItem = item{
-               element:        e,
-               count:          c,
-               sortedTagValue: sv,
-               seriesID:       seriesID,
-       }
-       return true
-}
-
-func (s *searcherIterator) Val() item {
-       return s.currItem
-}
-
-func (s *searcherIterator) Close() error {
-       if errors.Is(s.err, io.EOF) {
-               return s.fieldIterator.Close()
-       }
-       return multierr.Combine(s.err, s.fieldIterator.Close())
-}
-
-type item struct {
-       element        *element
-       sortedTagValue []byte
-       count          int
-       seriesID       common.SeriesID
-}
-
-func (i item) SortedField() []byte {
-       return i.sortedTagValue
-}
diff --git a/banyand/stream/iter_builder.go b/banyand/stream/iter_builder.go
deleted file mode 100644
index f225acf9..00000000
--- a/banyand/stream/iter_builder.go
+++ /dev/null
@@ -1,124 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package stream
-
-import (
-       "bytes"
-       "fmt"
-
-       "github.com/apache/skywalking-banyandb/api/common"
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
-       "github.com/apache/skywalking-banyandb/pkg/index"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-)
-
-type filterFn func(itemID uint64) bool
-
-func (s *stream) buildSeriesByIndex(tableWrappers 
[]storage.TSTableWrapper[*tsTable],
-       seriesList pbv1.SeriesList, sqo pbv1.StreamQueryOptions,
-) (series []*searcherIterator, err error) {
-       timeFilter := func(itemID uint64) bool {
-               return sqo.TimeRange.Contains(int64(itemID))
-       }
-       indexRuleForSorting := sqo.Order.Index
-       if len(indexRuleForSorting.Tags) != 1 {
-               return nil, fmt.Errorf("only support one tag for sorting, but 
got %d", len(indexRuleForSorting.Tags))
-       }
-       sortedTag := indexRuleForSorting.Tags[0]
-       tl := newTagLocation()
-       for i := range sqo.TagProjection {
-               for j := range sqo.TagProjection[i].Names {
-                       if sqo.TagProjection[i].Names[j] == sortedTag {
-                               tl.familyIndex, tl.tagIndex = i, j
-                       }
-               }
-       }
-       if !tl.valid() {
-               return nil, fmt.Errorf("sorted tag %s not found in tag 
projection", sortedTag)
-       }
-       entityMap, tagSpecIndex, tagProjIndex, sidToIndex := 
s.genIndex(sqo.TagProjection, seriesList)
-       sids := seriesList.IDs()
-       for _, tw := range tableWrappers {
-               seriesFilter := make(map[common.SeriesID]filterFn)
-               if sqo.Filter != nil {
-                       for i := range sids {
-                               pl, errExe := sqo.Filter.Execute(func(_ 
databasev1.IndexRule_Type) (index.Searcher, error) {
-                                       return tw.Table().Index().store, nil
-                               }, sids[i])
-                               if errExe != nil {
-                                       return nil, err
-                               }
-
-                               seriesFilter[sids[i]] = func(itemID uint64) 
bool {
-                                       if pl == nil {
-                                               return true
-                                       }
-                                       return pl.Contains(itemID)
-                               }
-                       }
-               }
-
-               var inner index.FieldIterator
-               fieldKey := index.FieldKey{
-                       IndexRuleID: indexRuleForSorting.GetMetadata().GetId(),
-                       Analyzer:    indexRuleForSorting.GetAnalyzer(),
-               }
-               inner, err = tw.Table().Index().Sort(sids, fieldKey, 
sqo.Order.Sort, sqo.MaxElementSize)
-               if err != nil {
-                       return nil, err
-               }
-
-               if inner != nil {
-                       series = append(series, newSearcherIterator(s.l, inner, 
tw.Table(),
-                               seriesFilter, timeFilter, sqo.TagProjection, tl,
-                               tagSpecIndex, tagProjIndex, sidToIndex, 
seriesList, entityMap))
-               }
-       }
-       return
-}
-
-type tagLocation struct {
-       familyIndex int
-       tagIndex    int
-}
-
-func newTagLocation() tagLocation {
-       return tagLocation{
-               familyIndex: -1,
-               tagIndex:    -1,
-       }
-}
-
-func (t tagLocation) valid() bool {
-       return t.familyIndex != -1 && t.tagIndex != -1
-}
-
-func (t tagLocation) getTagValue(e *element) ([]byte, error) {
-       if len(e.tagFamilies) <= t.familyIndex {
-               return nil, fmt.Errorf("tag family index %d out of range", 
t.familyIndex)
-       }
-       if len(e.tagFamilies[t.familyIndex].tags) <= t.tagIndex {
-               return nil, fmt.Errorf("tag index %d out of range", t.tagIndex)
-       }
-       if len(e.tagFamilies[t.familyIndex].tags[t.tagIndex].values) <= e.index 
{
-               return nil, fmt.Errorf("element index %d out of range", e.index)
-       }
-       v := e.tagFamilies[t.familyIndex].tags[t.tagIndex].values[e.index]
-       return bytes.Clone(v), nil
-}
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index 9d9d4836..a834e2e7 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -18,7 +18,6 @@
 package stream
 
 import (
-       "errors"
        "fmt"
        "path"
        "path/filepath"
@@ -29,11 +28,8 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
-       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
-       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 const (
@@ -47,67 +43,6 @@ const (
        tagFamiliesFilenameExt         = ".tf"
 )
 
-type columnElements struct {
-       elementID []string
-       // TODO: change it to 1d array after refactoring low-level query
-       tagFamilies [][]pbv1.TagFamily
-       timestamp   []int64
-}
-
-func newColumnElements() *columnElements {
-       ces := &columnElements{}
-       ces.elementID = make([]string, 0)
-       ces.tagFamilies = make([][]pbv1.TagFamily, 0)
-       ces.timestamp = make([]int64, 0)
-       return ces
-}
-
-func (ces *columnElements) BuildFromElement(e *element, tp 
[]pbv1.TagProjection) {
-       tagFamilies := make([]pbv1.TagFamily, 0)
-       for i, tf := range e.tagFamilies {
-               tagFamily := pbv1.TagFamily{
-                       Name: tf.name,
-               }
-               for j, t := range tf.tags {
-                       tag := pbv1.Tag{
-                               Name: t.name,
-                       }
-                       if tag.Name == "" {
-                               tag.Name = tp[i].Names[j]
-                               tag.Values = append(tag.Values, 
pbv1.NullTagValue)
-                       } else {
-                               tag.Values = append(tag.Values, 
mustDecodeTagValue(t.valueType, t.values[e.index]))
-                       }
-                       tagFamily.Tags = append(tagFamily.Tags, tag)
-               }
-               tagFamilies = append(tagFamilies, tagFamily)
-       }
-       ces.tagFamilies = append(ces.tagFamilies, tagFamilies)
-       ces.elementID = append(ces.elementID, e.elementID)
-       ces.timestamp = append(ces.timestamp, e.timestamp)
-}
-
-func (ces *columnElements) Pull() *pbv1.StreamColumnResult {
-       r := &pbv1.StreamColumnResult{}
-       r.Timestamps = make([]int64, 0)
-       r.ElementIDs = make([]string, 0)
-       r.TagFamilies = make([][]pbv1.TagFamily, len(ces.tagFamilies))
-       r.Timestamps = append(r.Timestamps, ces.timestamp...)
-       r.ElementIDs = append(r.ElementIDs, ces.elementID...)
-       for i, tfs := range ces.tagFamilies {
-               r.TagFamilies[i] = make([]pbv1.TagFamily, 0)
-               r.TagFamilies[i] = append(r.TagFamilies[i], tfs...)
-       }
-       return r
-}
-
-type element struct {
-       elementID   string
-       tagFamilies []*tagFamily
-       timestamp   int64
-       index       int
-}
-
 type part struct {
        primary              fs.Reader
        timestamps           fs.Reader
@@ -120,10 +55,6 @@ type part struct {
        partMetadata         partMetadata
 }
 
-func (p *part) containTimestamp(timestamp int64) bool {
-       return timestamp >= p.partMetadata.MinTimestamp && timestamp <= 
p.partMetadata.MaxTimestamp
-}
-
 func (p *part) close() {
        fs.MustClose(p.primary)
        fs.MustClose(p.timestamps)
@@ -140,99 +71,6 @@ func (p *part) String() string {
        return fmt.Sprintf("part %d", p.partMetadata.ID)
 }
 
-func (p *part) getElement(seriesID common.SeriesID, timestamp int64, 
tagProjection []pbv1.TagProjection) (*element, int, error) {
-       // TODO: refactor to column-based query
-       // TODO: cache blocks
-       if seriesID < p.primaryBlockMetadata[0].seriesID {
-               return nil, 0, errors.New("element not found")
-       }
-       for i, primaryMeta := range p.primaryBlockMetadata {
-               if seriesID < p.primaryBlockMetadata[i].seriesID {
-                       continue
-               }
-
-               compressedPrimaryBuf := make([]byte, primaryMeta.size)
-               fs.MustReadData(p.primary, int64(primaryMeta.offset), 
compressedPrimaryBuf)
-               var err error
-               primaryBuf := make([]byte, 0)
-               primaryBuf, err = zstd.Decompress(primaryBuf[:0], 
compressedPrimaryBuf)
-               if err != nil {
-                       return nil, 0, fmt.Errorf("cannot decompress index 
block: %w", err)
-               }
-               bm := make([]blockMetadata, 0)
-               bm, err = unmarshalBlockMetadata(bm, primaryBuf)
-               if err != nil {
-                       return nil, 0, fmt.Errorf("cannot unmarshal index 
block: %w", err)
-               }
-               for i := range bm {
-                       if bm[i].seriesID == seriesID && bm[i].timestamps.max 
>= timestamp && bm[i].timestamps.min <= timestamp {
-                               timestamps := make([]int64, 0)
-                               timestamps = mustReadTimestampsFrom(timestamps, 
&bm[i].timestamps, int(bm[i].count), p.timestamps)
-                               for j, ts := range timestamps {
-                                       if timestamp == ts {
-                                               elementIDs := make([]string, 0)
-                                               elementIDs = 
mustReadElementIDsFrom(elementIDs, &bm[i].elementIDs, int(bm[i].count), 
p.elementIDs)
-                                               tfs := make([]*tagFamily, 0)
-                                               for k := range tagProjection {
-                                                       name := 
tagProjection[k].Family
-                                                       block, ok := 
bm[i].tagFamilies[name]
-                                                       if !ok {
-                                                               tfs = 
append(tfs, &tagFamily{name: name, tags: make([]tag, 
len(tagProjection[k].Names))})
-                                                               continue
-                                                       }
-                                                       decoder := 
&encoding.BytesBlockDecoder{}
-                                                       tf := 
unmarshalTagFamily(decoder, name, block, tagProjection[k].Names, 
p.tagFamilyMetadata[name], p.tagFamilies[name], len(timestamps))
-                                                       tfs = append(tfs, tf)
-                                               }
-
-                                               return &element{
-                                                       timestamp:   
timestamps[j],
-                                                       elementID:   
elementIDs[j],
-                                                       tagFamilies: tfs,
-                                                       index:       j,
-                                               }, len(timestamps), nil
-                                       }
-                                       if ts > timestamp {
-                                               break
-                                       }
-                               }
-                       }
-               }
-       }
-       return nil, 0, errors.New("element not found")
-}
-
-func unmarshalTagFamily(decoder *encoding.BytesBlockDecoder, name string,
-       tagFamilyMetadataBlock *dataBlock, tagProjection []string, metaReader, 
valueReader fs.Reader, count int,
-) *tagFamily {
-       if len(tagProjection) < 1 {
-               return &tagFamily{}
-       }
-       bb := bigValuePool.Generate()
-       bb.Buf = bytes.ResizeExact(bb.Buf, int(tagFamilyMetadataBlock.size))
-       fs.MustReadData(metaReader, int64(tagFamilyMetadataBlock.offset), 
bb.Buf)
-       tfm := generateTagFamilyMetadata()
-       defer releaseTagFamilyMetadata(tfm)
-       err := tfm.unmarshal(bb.Buf)
-       if err != nil {
-               logger.Panicf("%s: cannot unmarshal tagFamilyMetadata: %v", 
metaReader.Path(), err)
-       }
-       bigValuePool.Release(bb)
-       tf := tagFamily{}
-       tf.name = name
-       tf.tags = tf.resizeTags(len(tagProjection))
-
-       for j := range tagProjection {
-               for i := range tfm.tagMetadata {
-                       if tagProjection[j] == tfm.tagMetadata[i].name {
-                               tf.tags[j].mustReadValues(decoder, valueReader, 
tfm.tagMetadata[i], uint64(count))
-                               break
-                       }
-               }
-       }
-       return &tf
-}
-
 func openMemPart(mp *memPart) *part {
        var p part
        p.partMetadata = mp.partMetadata
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index f9476d55..2edff01c 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -24,21 +24,24 @@ import (
        "fmt"
        "sort"
 
-       "go.uber.org/multierr"
-
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+type seriesTimestampMap map[common.SeriesID][]int64
+
 type queryOptions struct {
-       elementRefMap map[common.SeriesID][]int64
+       filteredRefMap seriesTimestampMap
        pbv1.StreamQueryOptions
        minTimestamp int64
        maxTimestamp int64
@@ -140,16 +143,17 @@ func strArrTagValue(values []string) *modelv1.TagValue {
 }
 
 type queryResult struct {
-       entityMap    map[string]int
-       sidToIndex   map[common.SeriesID]int
-       tagNameIndex map[string]partition.TagLocator
-       schema       *databasev1.Stream
-       data         []*blockCursor
-       snapshots    []*snapshot
-       seriesList   pbv1.SeriesList
-       loaded       bool
-       orderByTS    bool
-       ascTS        bool
+       entityMap      map[string]int
+       sidToIndex     map[common.SeriesID]int
+       tagNameIndex   map[string]partition.TagLocator
+       schema         *databasev1.Stream
+       elementRefList []*elementRef
+       data           []*blockCursor
+       snapshots      []*snapshot
+       seriesList     pbv1.SeriesList
+       loaded         bool
+       orderByTS      bool
+       asc            bool
 }
 
 func (qr *queryResult) Pull() *pbv1.StreamResult {
@@ -231,9 +235,13 @@ func (qr *queryResult) Pull() *pbv1.StreamResult {
                qr.loaded = true
                heap.Init(qr)
        }
+
        if len(qr.data) == 0 {
                return nil
        }
+       if !qr.orderByTS {
+               return qr.mergeByTagValue()
+       }
        if len(qr.data) == 1 {
                r := &pbv1.StreamResult{}
                bc := qr.data[0]
@@ -241,7 +249,7 @@ func (qr *queryResult) Pull() *pbv1.StreamResult {
                qr.data = qr.data[:0]
                return r
        }
-       return qr.merge()
+       return qr.mergeByTimestamp()
 }
 
 func (qr *queryResult) Release() {
@@ -261,17 +269,13 @@ func (qr queryResult) Len() int {
 }
 
 func (qr queryResult) Less(i, j int) bool {
-       leftTS := qr.data[i].timestamps[qr.data[i].idx]
-       rightTS := qr.data[j].timestamps[qr.data[j].idx]
-       if qr.orderByTS {
-               if qr.ascTS {
-                       return leftTS < rightTS
-               }
-               return leftTS > rightTS
+       leftIdx, rightIdx := qr.data[i].idx, qr.data[j].idx
+       leftTS := qr.data[i].timestamps[leftIdx]
+       rightTS := qr.data[j].timestamps[rightIdx]
+       if qr.asc {
+               return leftTS < rightTS
        }
-       leftSIDIndex := qr.sidToIndex[qr.data[i].bm.seriesID]
-       rightSIDIndex := qr.sidToIndex[qr.data[j].bm.seriesID]
-       return leftSIDIndex < rightSIDIndex
+       return leftTS > rightTS
 }
 
 func (qr queryResult) Swap(i, j int) {
@@ -291,10 +295,59 @@ func (qr *queryResult) Pop() interface{} {
 }
 
 func (qr *queryResult) orderByTimestampDesc() bool {
-       return qr.orderByTS && !qr.ascTS
+       return qr.orderByTS && !qr.asc
 }
 
-func (qr *queryResult) merge() *pbv1.StreamResult {
+func (qr *queryResult) mergeByTagValue() *pbv1.StreamResult {
+       tmp := &pbv1.StreamResult{}
+       prevIdx := 0
+       elementRefToIdx := make(map[elementRef]int)
+       for _, data := range qr.data {
+               data.copyAllTo(tmp, qr.orderByTimestampDesc())
+               var idx int
+               for idx = prevIdx; idx < len(tmp.Timestamps); idx++ {
+                       sid, ts := tmp.SIDs[idx], tmp.Timestamps[idx]
+                       er := elementRef{
+                               seriesID:  sid,
+                               timestamp: ts,
+                       }
+                       elementRefToIdx[er] = idx
+               }
+               prevIdx = idx
+       }
+
+       r := &pbv1.StreamResult{
+               TagFamilies: []pbv1.TagFamily{},
+       }
+       for _, tagFamily := range tmp.TagFamilies {
+               tf := pbv1.TagFamily{
+                       Name: tagFamily.Name,
+                       Tags: []pbv1.Tag{},
+               }
+               for _, tag := range tagFamily.Tags {
+                       t := pbv1.Tag{
+                               Name:   tag.Name,
+                               Values: []*modelv1.TagValue{},
+                       }
+                       tf.Tags = append(tf.Tags, t)
+               }
+               r.TagFamilies = append(r.TagFamilies, tf)
+       }
+       for _, er := range qr.elementRefList {
+               idx := elementRefToIdx[*er]
+               r.Timestamps = append(r.Timestamps, tmp.Timestamps[idx])
+               r.ElementIDs = append(r.ElementIDs, tmp.ElementIDs[idx])
+               for i := 0; i < len(r.TagFamilies); i++ {
+                       for j := 0; j < len(r.TagFamilies[i].Tags); j++ {
+                               r.TagFamilies[i].Tags[j].Values = 
append(r.TagFamilies[i].Tags[j].Values, tmp.TagFamilies[i].Tags[j].Values[idx])
+                       }
+               }
+       }
+       qr.data = qr.data[:0]
+       return r
+}
+
+func (qr *queryResult) mergeByTimestamp() *pbv1.StreamResult {
        step := 1
        if qr.orderByTimestampDesc() {
                step = -1
@@ -330,40 +383,7 @@ func (qr *queryResult) merge() *pbv1.StreamResult {
        return result
 }
 
-func (s *stream) genIndex(tagProj []pbv1.TagProjection, seriesList 
pbv1.SeriesList) (map[string]int, map[string]*databasev1.TagSpec,
-       map[string]partition.TagLocator, map[common.SeriesID]int,
-) {
-       entityMap := make(map[string]int)
-       for idx, entity := range s.schema.GetEntity().GetTagNames() {
-               entityMap[entity] = idx + 1
-       }
-       tagSpecIndex := make(map[string]*databasev1.TagSpec)
-       for _, tagFamilySpec := range s.schema.GetTagFamilies() {
-               for _, tagSpec := range tagFamilySpec.Tags {
-                       tagSpecIndex[tagSpec.GetName()] = tagSpec
-               }
-       }
-       tagProjIndex := make(map[string]partition.TagLocator)
-       for i, tagFamilyProj := range tagProj {
-               for j, tagProj := range tagFamilyProj.Names {
-                       if entityMap[tagProj] == 0 {
-                               continue
-                       }
-                       tagProjIndex[tagProj] = partition.TagLocator{
-                               FamilyOffset: i,
-                               TagOffset:    j,
-                       }
-               }
-       }
-       sidToIndex := make(map[common.SeriesID]int)
-       for idx, series := range seriesList {
-               sidToIndex[series.ID] = idx
-       }
-
-       return entityMap, tagSpecIndex, tagProjIndex, sidToIndex
-}
-
-func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error) {
+func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (sqr 
pbv1.StreamQueryResult, err error) {
        if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
                return nil, errors.New("invalid query options: timeRange and 
series are required")
        }
@@ -373,7 +393,7 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (pbv1.S
        db := s.databaseSupplier.SupplyTSDB()
        var result queryResult
        if db == nil {
-               return &result, nil
+               return sqr, nil
        }
        tsdb := db.(storage.TSDB[*tsTable, option])
        tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange)
@@ -382,6 +402,7 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (pbv1.S
                        tabWrappers[i].DecRef()
                }
        }()
+
        series := make([]*pbv1.Series, len(sqo.Entities))
        for i := range sqo.Entities {
                series[i] = &pbv1.Series{
@@ -389,24 +410,33 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (pbv1.S
                        EntityValues: sqo.Entities[i],
                }
        }
-       sl, err := tsdb.Lookup(ctx, series)
+       seriesList, err := tsdb.Lookup(ctx, series)
        if err != nil {
                return nil, err
        }
+       if len(seriesList) == 0 {
+               return sqr, nil
+       }
 
-       if len(sl) < 1 {
-               return &result, nil
+       filteredRefMap, err := indexSearch(ctx, sqo, tabWrappers, seriesList)
+       if err != nil {
+               return nil, err
        }
-       var sids []common.SeriesID
-       for i := range sl {
-               sids = append(sids, sl[i].ID)
+       elementRefList, sortedRefMap, err := indexSort(s, sqo, tabWrappers, 
seriesList, filteredRefMap)
+       if err != nil {
+               return nil, err
        }
-       var parts []*part
+       if sortedRefMap != nil {
+               filteredRefMap = sortedRefMap
+       }
+
        qo := queryOptions{
                StreamQueryOptions: sqo,
                minTimestamp:       sqo.TimeRange.Start.UnixNano(),
                maxTimestamp:       sqo.TimeRange.End.UnixNano(),
+               filteredRefMap:     filteredRefMap,
        }
+       var parts []*part
        var n int
        for i := range tabWrappers {
                s := tabWrappers[i].Table().currentSnapshot()
@@ -425,6 +455,16 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (pbv1.S
        // TODO: cache tstIter
        var ti tstIter
        defer ti.reset()
+       var sids []common.SeriesID
+       for i := 0; i < len(seriesList); i++ {
+               sid := seriesList[i].ID
+               if filteredRefMap != nil && filteredRefMap[sid] == nil {
+                       seriesList = append(seriesList[:i], seriesList[i+1:]...)
+                       i--
+                       continue
+               }
+               sids = append(sids, sid)
+       }
        originalSids := make([]common.SeriesID, len(sids))
        copy(originalSids, sids)
        sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
@@ -442,12 +482,12 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (pbv1.S
                return nil, fmt.Errorf("cannot iterate tstIter: %w", ti.Error())
        }
 
-       entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, sl)
+       entityMap, sidToIndex := s.genIndex(seriesList)
        result.entityMap = entityMap
        result.sidToIndex = sidToIndex
        result.tagNameIndex = make(map[string]partition.TagLocator)
        result.schema = s.schema
-       result.seriesList = sl
+       result.seriesList = seriesList
        for i, si := range originalSids {
                result.sidToIndex[si] = i
        }
@@ -459,224 +499,122 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (pbv1.S
                        }
                }
        }
-       result.orderByTS = true
        if sqo.Order == nil {
-               result.ascTS = true
+               result.orderByTS = true
+               result.asc = true
                return &result, nil
        }
+       if sqo.Order.Index == nil {
+               result.orderByTS = true
+       } else {
+               result.elementRefList = elementRefList
+       }
        if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
-               result.ascTS = true
+               result.asc = true
        }
        return &result, nil
 }
 
-func (s *stream) Sort(ctx context.Context, sqo pbv1.StreamQueryOptions) (ssr 
pbv1.StreamSortResult, err error) {
-       if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
-               return nil, errors.New("invalid query options: timeRange and 
series are required")
-       }
-       if len(sqo.TagProjection) == 0 {
-               return nil, errors.New("invalid query options: tagProjection is 
required")
-       }
-       db := s.databaseSupplier.SupplyTSDB()
-       if db == nil {
-               return ssr, nil
-       }
-       tsdb := db.(storage.TSDB[*tsTable, option])
-       tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange)
-       defer func() {
-               for i := range tabWrappers {
-                       tabWrappers[i].DecRef()
-               }
-       }()
-
-       series := make([]*pbv1.Series, len(sqo.Entities))
-       for i := range sqo.Entities {
-               series[i] = &pbv1.Series{
-                       Subject:      sqo.Name,
-                       EntityValues: sqo.Entities[i],
-               }
-       }
-       seriesList, err := tsdb.Lookup(ctx, series)
-       if err != nil {
-               return nil, err
-       }
-       if len(seriesList) == 0 {
-               return ssr, nil
-       }
-
-       iters, err := s.buildSeriesByIndex(tabWrappers, seriesList, sqo)
-       if err != nil {
-               return nil, err
-       }
-       if len(iters) == 0 {
-               return ssr, nil
-       }
-
-       it := newItemIter(iters, sqo.Order.Sort)
-       defer func() {
-               err = multierr.Append(err, it.Close())
-       }()
-
-       ces := newColumnElements()
-       for it.Next() {
-               nextItem := it.Val()
-               e := nextItem.element
-               ces.BuildFromElement(e, sqo.TagProjection)
-               if len(ces.timestamp) >= sqo.MaxElementSize {
-                       break
-               }
-       }
-       return ces, err
-}
-
-// newItemIter returns a ItemIterator which mergers several tsdb.Iterator by 
input sorting order.
-func newItemIter(iters []*searcherIterator, s modelv1.Sort) 
itersort.Iterator[item] {
-       var ii []itersort.Iterator[item]
-       for _, iter := range iters {
-               ii = append(ii, iter)
-       }
-       if s == modelv1.Sort_SORT_DESC {
-               return itersort.NewItemIter[item](ii, true)
-       }
-       return itersort.NewItemIter[item](ii, false)
-}
-
-func (s *stream) Filter(ctx context.Context, sqo pbv1.StreamQueryOptions) (sqr 
pbv1.StreamQueryResult, err error) {
-       if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
-               return nil, errors.New("invalid query options: timeRange and 
series are required")
-       }
-       if len(sqo.TagProjection) == 0 {
-               return nil, errors.New("invalid query options: tagProjection is 
required")
-       }
-       db := s.databaseSupplier.SupplyTSDB()
-       var result queryResult
-       if db == nil {
-               return sqr, nil
-       }
-       tsdb := db.(storage.TSDB[*tsTable, option])
-       tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange)
-       defer func() {
-               for i := range tabWrappers {
-                       tabWrappers[i].DecRef()
-               }
-       }()
-
-       series := make([]*pbv1.Series, len(sqo.Entities))
-       for i := range sqo.Entities {
-               series[i] = &pbv1.Series{
-                       Subject:      sqo.Name,
-                       EntityValues: sqo.Entities[i],
-               }
-       }
-       seriesList, err := tsdb.Lookup(ctx, series)
-       if err != nil {
-               return nil, err
+func indexSearch(ctx context.Context, sqo pbv1.StreamQueryOptions,
+       tabWrappers []storage.TSTableWrapper[*tsTable], seriesList 
pbv1.SeriesList,
+) (map[common.SeriesID][]int64, error) {
+       if sqo.Filter == nil || sqo.Filter == logical.ENode {
+               return nil, nil
        }
-       if len(seriesList) == 0 {
-               return sqr, nil
-       }
-
-       var elementRefList []elementRef
+       var filteredRefList []elementRef
        for _, tw := range tabWrappers {
                index := tw.Table().Index()
                erl, err := index.Search(ctx, seriesList, sqo.Filter, 
sqo.TimeRange, sqo.Order)
                if err != nil {
                        return nil, err
                }
-               elementRefList = append(elementRefList, erl...)
-               if len(elementRefList) > sqo.MaxElementSize {
-                       elementRefList = elementRefList[:sqo.MaxElementSize]
+               filteredRefList = append(filteredRefList, erl...)
+               if (sqo.Order == nil || sqo.Order.Index == nil) && 
len(filteredRefList) > sqo.MaxElementSize {
+                       filteredRefList = filteredRefList[:sqo.MaxElementSize]
                        break
                }
        }
-       var elementRefMap map[common.SeriesID][]int64
-       if len(elementRefList) != 0 {
-               elementRefMap = make(map[common.SeriesID][]int64)
-               for _, ref := range elementRefList {
-                       if _, ok := elementRefMap[ref.seriesID]; !ok {
-                               elementRefMap[ref.seriesID] = 
[]int64{ref.timestamp}
+       filteredRefMap := make(map[common.SeriesID][]int64)
+       if len(filteredRefList) != 0 {
+               for _, ref := range filteredRefList {
+                       if _, ok := filteredRefMap[ref.seriesID]; !ok {
+                               filteredRefMap[ref.seriesID] = 
[]int64{ref.timestamp}
                        } else {
-                               elementRefMap[ref.seriesID] = 
append(elementRefMap[ref.seriesID], ref.timestamp)
+                               filteredRefMap[ref.seriesID] = 
append(filteredRefMap[ref.seriesID], ref.timestamp)
                        }
                }
        }
-       qo := queryOptions{
-               StreamQueryOptions: sqo,
-               minTimestamp:       sqo.TimeRange.Start.UnixNano(),
-               maxTimestamp:       sqo.TimeRange.End.UnixNano(),
-               elementRefMap:      elementRefMap,
+       return filteredRefMap, nil
+}
+
+func indexSort(s *stream, sqo pbv1.StreamQueryOptions, tabWrappers 
[]storage.TSTableWrapper[*tsTable],
+       seriesList pbv1.SeriesList, filteredRefMap map[common.SeriesID][]int64,
+) ([]*elementRef, map[common.SeriesID][]int64, error) {
+       if sqo.Order == nil || sqo.Order.Index == nil {
+               return nil, nil, nil
        }
+       iters, err := s.buildItersByIndex(tabWrappers, seriesList, sqo)
+       if err != nil {
+               return nil, nil, err
+       }
+       desc := sqo.Order != nil && sqo.Order.Index == nil && sqo.Order.Sort == 
modelv1.Sort_SORT_DESC
+       itemIter := itersort.NewItemIter[*index.ItemRef](iters, desc)
 
-       var parts []*part
-       var n int
-       for i := range tabWrappers {
-               s := tabWrappers[i].Table().currentSnapshot()
-               if s == nil {
-                       continue
+       var elementRefList []*elementRef
+       sortedRefMap := make(map[common.SeriesID][]int64)
+       for {
+               if !itemIter.Next() {
+                       break
                }
-               parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp)
-               if n < 1 {
-                       s.decRef()
+               val := itemIter.Val()
+               ts, sid := val.DocID, val.SeriesID
+               if filteredRefMap != nil && (filteredRefMap[sid] == nil || 
timestamp.Find(filteredRefMap[sid], int64(ts)) == -1) {
                        continue
                }
-               result.snapshots = append(result.snapshots, s)
-       }
-       bma := generateBlockMetadataArray()
-       defer releaseBlockMetadataArray(bma)
-       // TODO: cache tstIter
-       var ti tstIter
-       defer ti.reset()
-       var sids []common.SeriesID
-       for i := 0; i < len(seriesList); i++ {
-               sid := seriesList[i].ID
-               if _, ok := elementRefMap[sid]; !ok {
-                       seriesList = append(seriesList[:i], seriesList[i+1:]...)
-                       i--
-                       continue
+               if _, ok := sortedRefMap[sid]; !ok {
+                       sortedRefMap[sid] = []int64{int64(ts)}
+               } else {
+                       sortedRefMap[sid] = append(sortedRefMap[sid], int64(ts))
+               }
+               elementRefList = append(elementRefList, &elementRef{timestamp: 
int64(ts), seriesID: sid})
+               if len(elementRefList) >= sqo.MaxElementSize {
+                       break
                }
-               sids = append(sids, sid)
-       }
-       originalSids := make([]common.SeriesID, len(sids))
-       copy(originalSids, sids)
-       sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
-       ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
-       if ti.Error() != nil {
-               return nil, fmt.Errorf("cannot init tstIter: %w", ti.Error())
-       }
-       for ti.nextBlock() {
-               bc := generateBlockCursor()
-               p := ti.piHeap[0]
-               bc.init(p.p, p.curBlock, qo)
-               result.data = append(result.data, bc)
-       }
-       if ti.Error() != nil {
-               return nil, fmt.Errorf("cannot iterate tstIter: %w", ti.Error())
        }
+       return elementRefList, sortedRefMap, nil
+}
 
-       entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, seriesList)
-       result.entityMap = entityMap
-       result.sidToIndex = sidToIndex
-       result.tagNameIndex = make(map[string]partition.TagLocator)
-       result.schema = s.schema
-       result.seriesList = seriesList
-       for i, si := range originalSids {
-               result.sidToIndex[si] = i
-       }
-       for i, tagFamilySpec := range s.schema.GetTagFamilies() {
-               for j, tagSpec := range tagFamilySpec.GetTags() {
-                       result.tagNameIndex[tagSpec.GetName()] = 
partition.TagLocator{
-                               FamilyOffset: i,
-                               TagOffset:    j,
-                       }
+func (s *stream) buildItersByIndex(tableWrappers 
[]storage.TSTableWrapper[*tsTable],
+       seriesList pbv1.SeriesList, sqo pbv1.StreamQueryOptions,
+) (iters []itersort.Iterator[*index.ItemRef], err error) {
+       indexRuleForSorting := sqo.Order.Index
+       if len(indexRuleForSorting.Tags) != 1 {
+               return nil, fmt.Errorf("only support one tag for sorting, but 
got %d", len(indexRuleForSorting.Tags))
+       }
+       sids := seriesList.IDs()
+       for _, tw := range tableWrappers {
+               var iter index.FieldIterator[*index.ItemRef]
+               fieldKey := index.FieldKey{
+                       IndexRuleID: indexRuleForSorting.GetMetadata().GetId(),
+                       Analyzer:    indexRuleForSorting.GetAnalyzer(),
+               }
+               iter, err = tw.Table().Index().Sort(sids, fieldKey, 
sqo.Order.Sort, sqo.MaxElementSize)
+               if err != nil {
+                       return nil, err
                }
+               iters = append(iters, iter)
        }
-       result.orderByTS = true
-       if sqo.Order == nil {
-               result.ascTS = true
-               return &result, nil
+       return
+}
+
+func (s *stream) genIndex(seriesList pbv1.SeriesList) (map[string]int, 
map[common.SeriesID]int) {
+       entityMap := make(map[string]int)
+       for idx, entity := range s.schema.GetEntity().GetTagNames() {
+               entityMap[entity] = idx + 1
        }
-       if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
-               result.ascTS = true
+       sidToIndex := make(map[common.SeriesID]int)
+       for idx, series := range seriesList {
+               sidToIndex[series.ID] = idx
        }
-       return &result, nil
+       return entityMap, sidToIndex
 }
diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go
index fa3e1c61..4bd92c39 100644
--- a/banyand/stream/query_test.go
+++ b/banyand/stream/query_test.go
@@ -58,7 +58,7 @@ func TestQueryResult(t *testing.T) {
                        minTimestamp: 1,
                        maxTimestamp: 1,
                        want: []pbv1.StreamResult{{
-                               SID:        1,
+                               SIDs:       []common.SeriesID{1},
                                Timestamps: []int64{1},
                                ElementIDs: []string{"11"},
                                TagFamilies: []pbv1.TagFamily{
@@ -75,12 +75,12 @@ func TestQueryResult(t *testing.T) {
                                        }},
                                },
                        }, {
-                               SID:         3,
+                               SIDs:        []common.SeriesID{3, 3},
                                Timestamps:  []int64{1, 1},
                                ElementIDs:  []string{"31", "31"},
                                TagFamilies: nil,
                        }, {
-                               SID:        2,
+                               SIDs:       []common.SeriesID{2, 2},
                                Timestamps: []int64{1, 1},
                                ElementIDs: []string{"21", "21"},
                                TagFamilies: []pbv1.TagFamily{
@@ -90,7 +90,7 @@ func TestQueryResult(t *testing.T) {
                                        }},
                                },
                        }, {
-                               SID:        1,
+                               SIDs:       []common.SeriesID{1},
                                Timestamps: []int64{1},
                                ElementIDs: []string{"11"},
                                TagFamilies: []pbv1.TagFamily{
@@ -115,7 +115,7 @@ func TestQueryResult(t *testing.T) {
                        minTimestamp: 1,
                        maxTimestamp: 2,
                        want: []pbv1.StreamResult{{
-                               SID:        1,
+                               SIDs:       []common.SeriesID{1},
                                Timestamps: []int64{2},
                                ElementIDs: []string{"12"},
                                TagFamilies: []pbv1.TagFamily{
@@ -132,7 +132,7 @@ func TestQueryResult(t *testing.T) {
                                        }},
                                },
                        }, {
-                               SID:        2,
+                               SIDs:       []common.SeriesID{2},
                                Timestamps: []int64{2},
                                ElementIDs: []string{"22"},
                                TagFamilies: []pbv1.TagFamily{
@@ -142,12 +142,12 @@ func TestQueryResult(t *testing.T) {
                                        }},
                                },
                        }, {
-                               SID:         3,
+                               SIDs:        []common.SeriesID{3},
                                Timestamps:  []int64{2},
                                ElementIDs:  []string{"32"},
                                TagFamilies: nil,
                        }, {
-                               SID:        1,
+                               SIDs:       []common.SeriesID{1},
                                Timestamps: []int64{1},
                                ElementIDs: []string{"11"},
                                TagFamilies: []pbv1.TagFamily{
@@ -164,12 +164,12 @@ func TestQueryResult(t *testing.T) {
                                        }},
                                },
                        }, {
-                               SID:         3,
+                               SIDs:        []common.SeriesID{3},
                                Timestamps:  []int64{1},
                                ElementIDs:  []string{"31"},
                                TagFamilies: nil,
                        }, {
-                               SID:        2,
+                               SIDs:       []common.SeriesID{2},
                                Timestamps: []int64{1},
                                ElementIDs: []string{"21"},
                                TagFamilies: []pbv1.TagFamily{
@@ -188,7 +188,7 @@ func TestQueryResult(t *testing.T) {
                        minTimestamp: 1,
                        maxTimestamp: 2,
                        want: []pbv1.StreamResult{{
-                               SID:        1,
+                               SIDs:       []common.SeriesID{1},
                                Timestamps: []int64{1},
                                ElementIDs: []string{"11"},
                                TagFamilies: []pbv1.TagFamily{
@@ -205,12 +205,12 @@ func TestQueryResult(t *testing.T) {
                                        }},
                                },
                        }, {
-                               SID:         3,
+                               SIDs:        []common.SeriesID{3},
                                Timestamps:  []int64{1},
                                ElementIDs:  []string{"31"},
                                TagFamilies: nil,
                        }, {
-                               SID:        2,
+                               SIDs:       []common.SeriesID{2, 2},
                                Timestamps: []int64{1, 2},
                                ElementIDs: []string{"21", "22"},
                                TagFamilies: []pbv1.TagFamily{
@@ -220,7 +220,7 @@ func TestQueryResult(t *testing.T) {
                                        }},
                                },
                        }, {
-                               SID:        1,
+                               SIDs:       []common.SeriesID{1},
                                Timestamps: []int64{2},
                                ElementIDs: []string{"12"},
                                TagFamilies: []pbv1.TagFamily{
@@ -237,94 +237,12 @@ func TestQueryResult(t *testing.T) {
                                        }},
                                },
                        }, {
-                               SID:         3,
+                               SIDs:        []common.SeriesID{3},
                                Timestamps:  []int64{2},
                                ElementIDs:  []string{"32"},
                                TagFamilies: nil,
                        }},
                },
-               {
-                       name:          "Test with multiple parts with 
duplicated data order by Series",
-                       esList:        []*elements{esTS1, esTS1},
-                       sids:          []common.SeriesID{1, 2, 3},
-                       orderBySeries: true,
-                       minTimestamp:  1,
-                       maxTimestamp:  1,
-                       want: []pbv1.StreamResult{{
-                               SID:        1,
-                               Timestamps: []int64{1, 1},
-                               ElementIDs: []string{"11", "11"},
-                               TagFamilies: []pbv1.TagFamily{
-                                       {Name: "arrTag", Tags: []pbv1.Tag{
-                                               {Name: "strArrTag", Values: 
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"}), 
strArrTagValue([]string{"value1", "value2"})}},
-                                               {Name: "intArrTag", Values: 
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30}), 
int64ArrTagValue([]int64{25, 30})}},
-                                       }},
-                                       {Name: "binaryTag", Tags: []pbv1.Tag{
-                                               {Name: "binaryTag", Values: 
[]*modelv1.TagValue{binaryDataTagValue(longText), 
binaryDataTagValue(longText)}},
-                                       }},
-                                       {Name: "singleTag", Tags: []pbv1.Tag{
-                                               {Name: "strTag", Values: 
[]*modelv1.TagValue{strTagValue("value1"), strTagValue("value1")}},
-                                               {Name: "intTag", Values: 
[]*modelv1.TagValue{int64TagValue(10), int64TagValue(10)}},
-                                       }},
-                               },
-                       }, {
-                               SID:        2,
-                               Timestamps: []int64{1, 1},
-                               ElementIDs: []string{"21", "21"},
-                               TagFamilies: []pbv1.TagFamily{
-                                       {Name: "singleTag", Tags: []pbv1.Tag{
-                                               {Name: "strTag1", Values: 
[]*modelv1.TagValue{strTagValue("tag1"), strTagValue("tag1")}},
-                                               {Name: "strTag2", Values: 
[]*modelv1.TagValue{strTagValue("tag2"), strTagValue("tag2")}},
-                                       }},
-                               },
-                       }, {
-                               SID:         3,
-                               Timestamps:  []int64{1, 1},
-                               ElementIDs:  []string{"31", "31"},
-                               TagFamilies: nil,
-                       }},
-               },
-               {
-                       name:          "Test with multiple parts with multiple 
data order by Series",
-                       esList:        []*elements{esTS1, esTS2},
-                       sids:          []common.SeriesID{2, 1, 3},
-                       orderBySeries: true,
-                       minTimestamp:  1,
-                       maxTimestamp:  2,
-                       want: []pbv1.StreamResult{{
-                               SID:        2,
-                               Timestamps: []int64{2, 1},
-                               ElementIDs: []string{"22", "21"},
-                               TagFamilies: []pbv1.TagFamily{
-                                       {Name: "singleTag", Tags: []pbv1.Tag{
-                                               {Name: "strTag1", Values: 
[]*modelv1.TagValue{strTagValue("tag3"), strTagValue("tag1")}},
-                                               {Name: "strTag2", Values: 
[]*modelv1.TagValue{strTagValue("tag4"), strTagValue("tag2")}},
-                                       }},
-                               },
-                       }, {
-                               SID:        1,
-                               Timestamps: []int64{1, 2},
-                               ElementIDs: []string{"11", "12"},
-                               TagFamilies: []pbv1.TagFamily{
-                                       {Name: "arrTag", Tags: []pbv1.Tag{
-                                               {Name: "strArrTag", Values: 
[]*modelv1.TagValue{strArrTagValue([]string{"value1", "value2"}), 
strArrTagValue([]string{"value5", "value6"})}},
-                                               {Name: "intArrTag", Values: 
[]*modelv1.TagValue{int64ArrTagValue([]int64{25, 30}), 
int64ArrTagValue([]int64{35, 40})}},
-                                       }},
-                                       {Name: "binaryTag", Tags: []pbv1.Tag{
-                                               {Name: "binaryTag", Values: 
[]*modelv1.TagValue{binaryDataTagValue(longText), 
binaryDataTagValue(longText)}},
-                                       }},
-                                       {Name: "singleTag", Tags: []pbv1.Tag{
-                                               {Name: "strTag", Values: 
[]*modelv1.TagValue{strTagValue("value1"), strTagValue("value3")}},
-                                               {Name: "intTag", Values: 
[]*modelv1.TagValue{int64TagValue(10), int64TagValue(30)}},
-                                       }},
-                               },
-                       }, {
-                               SID:         3,
-                               Timestamps:  []int64{2, 1},
-                               ElementIDs:  []string{"32", "31"},
-                               TagFamilies: nil,
-                       }},
-               },
        }
        bma := generateBlockMetadataArray()
        defer releaseBlockMetadataArray(bma)
@@ -365,7 +283,7 @@ func TestQueryResult(t *testing.T) {
                                        }
                                } else {
                                        result.orderByTS = true
-                                       result.ascTS = tt.ascTS
+                                       result.asc = tt.ascTS
                                }
                                var got []pbv1.StreamResult
                                for {
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 8c6f1b7a..7181ed36 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -61,8 +61,6 @@ type Stream interface {
        GetSchema() *databasev1.Stream
        GetIndexRules() []*databasev1.IndexRule
        Query(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error)
-       Sort(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamSortResult, error)
-       Filter(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error)
 }
 
 var _ Stream = (*stream)(nil)
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index dda0a57b..6fbad53a 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -34,7 +34,6 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -284,25 +283,6 @@ func (tst *tsTable) mustAddElements(es *elements) {
        }
 }
 
-func (tst *tsTable) getElement(seriesID common.SeriesID, timestamp int64, 
tagProjection []pbv1.TagProjection) (*element, int, error) {
-       s := tst.currentSnapshot()
-       if s == nil {
-               return nil, 0, fmt.Errorf("snapshot is absent, cannot find 
element with seriesID %d and timestamp %d", seriesID, timestamp)
-       }
-       defer s.decRef()
-
-       for _, p := range s.parts {
-               if !p.p.containTimestamp(timestamp) {
-                       continue
-               }
-               elem, count, err := p.p.getElement(seriesID, timestamp, 
tagProjection)
-               if err == nil {
-                       return elem, count, nil
-               }
-       }
-       return nil, 0, fmt.Errorf("cannot find element with seriesID %d and 
timestamp %d", seriesID, timestamp)
-}
-
 type tstIter struct {
        err           error
        parts         []*part
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 43883931..ff38d5d9 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -31,6 +31,7 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
+       "github.com/apache/skywalking-banyandb/pkg/iter/sort"
 )
 
 var errMalformed = errors.New("the data is malformed")
@@ -163,10 +164,22 @@ func (r RangeOpts) Between(value []byte) int {
        return 0
 }
 
+// ItemRef represents a reference to an item.
+type ItemRef struct {
+       Term     []byte
+       SeriesID common.SeriesID
+       DocID    uint64
+}
+
+// SortedField returns the value of the sorted field.
+func (ir ItemRef) SortedField() []byte {
+       return ir.Term
+}
+
 // FieldIterator allows iterating over a field's posting values.
-type FieldIterator interface {
+type FieldIterator[T sort.Comparable] interface {
        Next() bool
-       Val() (uint64, common.SeriesID, []byte)
+       Val() T
        Close() error
 }
 
@@ -179,8 +192,8 @@ func (i *dummyIterator) Next() bool {
        return false
 }
 
-func (i *dummyIterator) Val() (uint64, common.SeriesID, []byte) {
-       return 0, 0, nil
+func (i *dummyIterator) Val() *ItemRef {
+       return &ItemRef{}
 }
 
 func (i *dummyIterator) Close() error {
@@ -211,8 +224,8 @@ type Writer interface {
 
 // FieldIterable allows building a FieldIterator.
 type FieldIterable interface {
-       Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, 
preLoadSize int) (iter FieldIterator, err error)
-       Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, 
preLoadSize int) (FieldIterator, error)
+       Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, 
preLoadSize int) (iter FieldIterator[*ItemRef], err error)
+       Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, 
preLoadSize int) (FieldIterator[*ItemRef], error)
 }
 
 // Searcher allows searching a field either by its key or by its key and term.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index ceb9e739..b1d93d73 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -160,7 +160,7 @@ func (s *store) Write(fields []index.Field, docID uint64) 
error {
        return nil
 }
 
-func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, 
order modelv1.Sort, preLoadSize int) (iter index.FieldIterator, err error) {
+func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, 
order modelv1.Sort, preLoadSize int) (iter index.FieldIterator[*index.ItemRef], 
err error) {
        if termRange.Lower != nil &&
                termRange.Upper != nil &&
                bytes.Compare(termRange.Lower, termRange.Upper) > 0 {
@@ -241,8 +241,7 @@ func (s *store) MatchTerms(field index.Field) (list 
posting.List, err error) {
        }()
        list = roaring.NewPostingList()
        for iter.Next() {
-               docID, _, _ := iter.Val()
-               list.Insert(docID)
+               list.Insert(iter.Val().DocID)
        }
        return list, err
 }
@@ -275,8 +274,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches 
[]string) (posting.List,
        }()
        list := roaring.NewPostingList()
        for iter.Next() {
-               docID, _, _ := iter.Val()
-               list.Insert(docID)
+               list.Insert(iter.Val().DocID)
        }
        return list, err
 }
@@ -288,8 +286,7 @@ func (s *store) Range(fieldKey index.FieldKey, opts 
index.RangeOpts) (list posti
        }
        list = roaring.NewPostingList()
        for iter.Next() {
-               docID, _, _ := iter.Val()
-               list.Insert(docID)
+               list.Insert(iter.Val().DocID)
        }
        err = multierr.Append(err, iter.Close())
        return
@@ -456,8 +453,12 @@ func (bmi *blugeMatchIterator) Next() bool {
        return bmi.err == nil
 }
 
-func (bmi *blugeMatchIterator) Val() (uint64, common.SeriesID, []byte) {
-       return bmi.docID, bmi.seriesID, bmi.term
+func (bmi *blugeMatchIterator) Val() *index.ItemRef {
+       return &index.ItemRef{
+               SeriesID: bmi.seriesID,
+               DocID:    bmi.docID,
+               Term:     bmi.term,
+       }
 }
 
 func (bmi *blugeMatchIterator) Close() error {
diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go
index 72291ab9..0447e255 100644
--- a/pkg/index/inverted/sort.go
+++ b/pkg/index/inverted/sort.go
@@ -31,7 +31,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index"
 )
 
-func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order 
modelv1.Sort, preLoadSize int) (iter index.FieldIterator, err error) {
+func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order 
modelv1.Sort, preLoadSize int) (iter index.FieldIterator[*index.ItemRef], err 
error) {
        reader, err := s.writer.Reader()
        if err != nil {
                return nil, err
@@ -127,7 +127,7 @@ func (si *sortIterator) next() bool {
        return false
 }
 
-func (si *sortIterator) Val() (uint64, common.SeriesID, []byte) {
+func (si *sortIterator) Val() *index.ItemRef {
        return si.current.Val()
 }
 
diff --git a/pkg/index/inverted/sort_test.go b/pkg/index/inverted/sort_test.go
index 6fe759aa..4c67b994 100644
--- a/pkg/index/inverted/sort_test.go
+++ b/pkg/index/inverted/sort_test.go
@@ -167,10 +167,10 @@ func TestStore_Sort(t *testing.T) {
                        is.NotNil(iter)
                        var got result
                        for iter.Next() {
-                               docID, _, term := iter.Val()
-                               got.items = append(got.items, docID)
-                               if term != nil {
-                                       got.terms = append(got.terms, term)
+                               val := iter.Val()
+                               got.items = append(got.items, val.DocID)
+                               if val.Term != nil {
+                                       got.terms = append(got.terms, val.Term)
                                }
                        }
                        for i := 0; i < 10; i++ {
diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go
index 944b2e4c..569acbf6 100644
--- a/pkg/index/testcases/duration.go
+++ b/pkg/index/testcases/duration.go
@@ -303,8 +303,7 @@ func RunDuration(t *testing.T, data map[int]posting.List, 
store SimpleStore) {
                        is.NotNil(iter)
                        var got result
                        for iter.Next() {
-                               docID, _, _ := iter.Val()
-                               got.items = append(got.items, docID)
+                               got.items = append(got.items, iter.Val().DocID)
                        }
                        for i := 0; i < 10; i++ {
                                is.False(iter.Next())
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index 3aaf08a5..200d52fc 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -115,14 +115,7 @@ type StreamResult struct {
        Timestamps  []int64
        ElementIDs  []string
        TagFamilies []TagFamily
-       SID         common.SeriesID
-}
-
-// StreamColumnResult is the result of a stream sort or filter.
-type StreamColumnResult struct {
-       TagFamilies [][]TagFamily
-       Timestamps  []int64
-       ElementIDs  []string
+       SIDs        []common.SeriesID
 }
 
 // TagProjection is the projection of a tag family and its tags.
@@ -148,11 +141,6 @@ type StreamQueryResult interface {
        Release()
 }
 
-// StreamSortResult is the result of a stream sort.
-type StreamSortResult interface {
-       Pull() *StreamColumnResult
-}
-
 // OrderByType is the type of order by.
 type OrderByType int
 
diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go
index a5edf44c..f8234f7f 100644
--- a/pkg/query/executor/interface.go
+++ b/pkg/query/executor/interface.go
@@ -31,8 +31,6 @@ import (
 // StreamExecutionContext allows retrieving data through the stream module.
 type StreamExecutionContext interface {
        Query(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error)
-       Sort(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamSortResult, error)
-       Filter(ctx context.Context, opts pbv1.StreamQueryOptions) 
(pbv1.StreamQueryResult, error)
 }
 
 // StreamExecutionContextKey is the key of stream execution context in 
context.Context.
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go 
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 2e7a39fb..5a3032b0 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -71,56 +71,20 @@ func (i *localIndexScan) Execute(ctx context.Context) 
([]*streamv1.Element, erro
                }
        }
        ec := executor.FromStreamExecutionContext(ctx)
-
-       if i.order != nil && i.order.Index != nil {
-               ssr, err := ec.Sort(ctx, pbv1.StreamQueryOptions{
-                       Name:           i.metadata.GetName(),
-                       TimeRange:      &i.timeRange,
-                       Entities:       i.entities,
-                       Filter:         i.filter,
-                       Order:          orderBy,
-                       TagProjection:  i.projectionTags,
-                       MaxElementSize: i.maxElementSize,
-               })
-               if err != nil {
-                       return nil, err
-               }
-               if ssr == nil {
-                       return nil, nil
-               }
-               r := ssr.Pull()
-               return buildElementsFromColumnResult(r), nil
-       }
-
-       if i.filter != nil && i.filter != logical.ENode {
-               result, err := ec.Filter(ctx, pbv1.StreamQueryOptions{
-                       Name:           i.metadata.GetName(),
-                       TimeRange:      &i.timeRange,
-                       Entities:       i.entities,
-                       Filter:         i.filter,
-                       Order:          orderBy,
-                       TagProjection:  i.projectionTags,
-                       MaxElementSize: i.maxElementSize,
-               })
-               if err != nil {
-                       return nil, err
-               }
-               if result == nil {
-                       return nil, nil
-               }
-               return BuildElementsFromStreamResult(result), nil
-       }
-
        result, err := ec.Query(ctx, pbv1.StreamQueryOptions{
-               Name:          i.metadata.GetName(),
-               TimeRange:     &i.timeRange,
-               Entities:      i.entities,
-               Filter:        i.filter,
-               Order:         orderBy,
-               TagProjection: i.projectionTags,
+               Name:           i.metadata.GetName(),
+               TimeRange:      &i.timeRange,
+               Entities:       i.entities,
+               Filter:         i.filter,
+               Order:          orderBy,
+               TagProjection:  i.projectionTags,
+               MaxElementSize: i.maxElementSize,
        })
        if err != nil {
-               return nil, fmt.Errorf("failed to query stream: %w", err)
+               return nil, err
+       }
+       if result == nil {
+               return nil, nil
        }
        return BuildElementsFromStreamResult(result), nil
 }
@@ -142,30 +106,6 @@ func (i *localIndexScan) Schema() logical.Schema {
        return i.schema.ProjTags(i.projectionTagRefs...)
 }
 
-func buildElementsFromColumnResult(r *pbv1.StreamColumnResult) (elements 
[]*streamv1.Element) {
-       for i := range r.Timestamps {
-               e := &streamv1.Element{
-                       Timestamp: timestamppb.New(time.Unix(0, 
r.Timestamps[i])),
-                       ElementId: r.ElementIDs[i],
-               }
-
-               for _, tf := range r.TagFamilies[i] {
-                       tagFamily := &modelv1.TagFamily{
-                               Name: tf.Name,
-                       }
-                       e.TagFamilies = append(e.TagFamilies, tagFamily)
-                       for _, t := range tf.Tags {
-                               tagFamily.Tags = append(tagFamily.Tags, 
&modelv1.Tag{
-                                       Key:   t.Name,
-                                       Value: t.Values[0],
-                               })
-                       }
-               }
-               elements = append(elements, e)
-       }
-       return
-}
-
 // BuildElementsFromStreamResult builds a slice of elements from the given 
stream query result.
 func BuildElementsFromStreamResult(result pbv1.StreamQueryResult) (elements 
[]*streamv1.Element) {
        deduplication := make(map[string]struct{})

Reply via email to