This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new c1905988 Fix a bug and update API (#645)
c1905988 is described below

commit c190598804f9dd651b19a188cac2869bf4d8f5c7
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Mon Apr 14 11:10:02 2025 +0800

    Fix a bug and update API (#645)
---
 CHANGES.md                                |   5 +
 api/proto/banyandb/common/v1/common.proto |   4 +-
 api/proto/banyandb/measure/v1/query.proto |   4 +-
 api/proto/banyandb/property/v1/rpc.proto  |   2 -
 api/proto/banyandb/stream/v1/query.proto  |   4 +-
 api/proto/banyandb/version.go             |   2 +-
 banyand/internal/storage/rotation.go      | 103 ++++---
 banyand/internal/storage/rotation_test.go |  26 +-
 banyand/internal/storage/segment.go       | 200 ++++++++++---
 banyand/internal/storage/segment_test.go  | 481 ++++++++++++++++++++++++++++++
 banyand/internal/storage/shard.go         |   2 +-
 banyand/internal/storage/storage.go       |   2 +-
 banyand/internal/storage/tsdb.go          |  16 +-
 banyand/internal/storage/tsdb_test.go     |   7 +-
 banyand/liaison/grpc/discovery.go         |  28 +-
 banyand/liaison/grpc/measure.go           |  12 +-
 banyand/liaison/grpc/stream.go            |  12 +-
 banyand/measure/metadata.go               |  11 +-
 banyand/measure/query.go                  |   5 +-
 banyand/queue/pub/pub.go                  |  25 +-
 banyand/queue/pub/pub_test.go             |  16 +-
 banyand/queue/pub/selector.go             |   3 +
 banyand/stream/metadata.go                |  11 +-
 banyand/stream/query.go                   | 193 ++++++++----
 docs/api-reference.md                     |   7 +-
 pkg/bus/bus.go                            |   6 +-
 26 files changed, 971 insertions(+), 216 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index f252e132..a0cecbf9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,11 @@ Release Notes.
 ### Features
 
 - Add sharding_key for TopNAggregation source measure
+- API: Update the data matching rule from the node selector to the stage name.
+
+### Bug Fixes
+
+- Fix the deadlock issue when loading a closed segment.
 
 ## 0.8.0
 
diff --git a/api/proto/banyandb/common/v1/common.proto 
b/api/proto/banyandb/common/v1/common.proto
index 24846c56..2c6e5d75 100644
--- a/api/proto/banyandb/common/v1/common.proto
+++ b/api/proto/banyandb/common/v1/common.proto
@@ -93,8 +93,8 @@ message ResourceOpts {
   IntervalRule ttl = 3;
   // stages defines the ordered lifecycle stages. Data progresses through 
these stages sequentially.
   repeated LifecycleStage stages = 4;
-  // default_node_selector is the default node selector for queries if 
node_selector is not specified
-  string default_node_selector = 5;
+  // default_stages is the name of the default stage
+  repeated string default_stages = 5;
 }
 
 // Group is an internal object for Group management
diff --git a/api/proto/banyandb/measure/v1/query.proto 
b/api/proto/banyandb/measure/v1/query.proto
index 4537bcf5..05f8309b 100644
--- a/api/proto/banyandb/measure/v1/query.proto
+++ b/api/proto/banyandb/measure/v1/query.proto
@@ -111,6 +111,6 @@ message QueryRequest {
   model.v1.QueryOrder order_by = 12;
   // trace is used to enable trace for the query
   bool trace = 13;
-  // node_selector is used to specify the target node for the query
-  string node_selector = 14;
+  // stages is used to specify the stage of the data points in the lifecycle
+  repeated string stages = 14;
 }
diff --git a/api/proto/banyandb/property/v1/rpc.proto 
b/api/proto/banyandb/property/v1/rpc.proto
index 01c032dd..8f70291b 100644
--- a/api/proto/banyandb/property/v1/rpc.proto
+++ b/api/proto/banyandb/property/v1/rpc.proto
@@ -78,8 +78,6 @@ message QueryRequest {
   uint32 limit = 6;
   // trace is used to enable trace for the query
   bool trace = 7;
-  // node_selector is used to select the node to query
-  string node_selector = 8;
 }
 
 // QueryResponse is the response for a query to the Query module.
diff --git a/api/proto/banyandb/stream/v1/query.proto 
b/api/proto/banyandb/stream/v1/query.proto
index 4fe18f9e..ae23dd33 100644
--- a/api/proto/banyandb/stream/v1/query.proto
+++ b/api/proto/banyandb/stream/v1/query.proto
@@ -77,6 +77,6 @@ message QueryRequest {
   model.v1.TagProjection projection = 8 [(validate.rules).message.required = 
true];
   // trace is used to enable trace for the query
   bool trace = 9;
-  // node_selector is used to select the node to query
-  string node_selector = 10;
+  // stage is used to specify the stage of the query in the lifecycle
+  repeated string stages = 10;
 }
diff --git a/api/proto/banyandb/version.go b/api/proto/banyandb/version.go
index 7b89d508..2a4b305f 100644
--- a/api/proto/banyandb/version.go
+++ b/api/proto/banyandb/version.go
@@ -19,7 +19,7 @@
 package apiversion
 
 // Version is the version of the API.
-const Version = "0.8"
+const Version = "0.9"
 
 // Revision is the revision of the API. Building with -ldflags -X.
 var revision string
diff --git a/banyand/internal/storage/rotation.go 
b/banyand/internal/storage/rotation.go
index e8c6df9f..9d77b339 100644
--- a/banyand/internal/storage/rotation.go
+++ b/banyand/internal/storage/rotation.go
@@ -49,47 +49,80 @@ func (d *database[T, O]) startRotationTask() error {
                rt = newRetentionTask(d, options.TTL)
        }
        go func(rt *retentionTask[T, O]) {
-               for ts := range d.tsEventCh {
-                       func(ts int64) {
-                               d.rotationProcessOn.Store(true)
-                               defer d.rotationProcessOn.Store(false)
-                               t := time.Unix(0, ts)
-                               if rt != nil {
-                                       rt.run(t, d.logger)
+               var idleCheckTicker *time.Ticker
+               var idleCheckC <-chan time.Time
+
+               // Only create the ticker if idleTimeout is at least 1 second
+               if d.segmentController.idleTimeout >= time.Second {
+                       idleCheckTicker = time.NewTicker(10 * time.Minute)
+                       idleCheckC = idleCheckTicker.C
+                       defer func() {
+                               if idleCheckTicker != nil {
+                                       idleCheckTicker.Stop()
                                }
-                               func() {
-                                       ss := d.segmentController.segments()
-                                       if len(ss) == 0 {
-                                               return
+                       }()
+               }
+
+               for {
+                       select {
+                       case ts, ok := <-d.tsEventCh:
+                               if !ok {
+                                       d.logger.Debug().Msg("tsEventCh closed")
+                                       return
+                               }
+                               func(ts int64) {
+                                       d.rotationProcessOn.Store(true)
+                                       defer d.rotationProcessOn.Store(false)
+                                       t := time.Unix(0, ts)
+                                       if rt != nil {
+                                               rt.run(t, d.logger)
                                        }
-                                       defer func() {
-                                               for i := 0; i < len(ss); i++ {
-                                                       ss[i].DecRef()
+                                       func() {
+                                               ss, err := 
d.segmentController.segments(true) // Ensure segments are open
+                                               if err != nil {
+                                                       
d.logger.Error().Err(err).Msg("failed to get segments")
+                                                       return
                                                }
-                                       }()
-                                       for i := range ss {
-                                               if ss[i].End.UnixNano() < ts {
-                                                       
ss[i].index.store.Reset()
+                                               if len(ss) == 0 {
+                                                       return
                                                }
-                                       }
-                                       latest := ss[len(ss)-1]
-                                       gap := latest.End.UnixNano() - ts
-                                       // gap <=0 means the event is from the 
future
-                                       // the segment will be created by a 
written event directly
-                                       if gap <= 0 || gap > newSegmentTimeGap {
-                                               return
-                                       }
-                                       d.incTotalRotationStarted(1)
-                                       defer d.incTotalRotationFinished(1)
-                                       start := 
options.SegmentInterval.nextTime(t)
-                                       d.logger.Info().Time("segment_start", 
start).Time("event_time", t).Msg("create new segment")
-                                       _, err := 
d.segmentController.create(start)
-                                       if err != nil {
-                                               
d.logger.Error().Err(err).Msgf("failed to create new segment.")
-                                               d.incTotalRotationErr(1)
+                                               defer func() {
+                                                       for i := 0; i < 
len(ss); i++ {
+                                                               ss[i].DecRef()
+                                                       }
+                                               }()
+                                               for i := range ss {
+                                                       if ss[i].End.UnixNano() 
< ts {
+                                                               
ss[i].index.store.Reset()
+                                                       }
+                                               }
+                                               latest := ss[len(ss)-1]
+                                               gap := latest.End.UnixNano() - 
ts
+                                               // gap <=0 means the event is 
from the future
+                                               // the segment will be created 
by a written event directly
+                                               if gap <= 0 || gap > 
newSegmentTimeGap {
+                                                       return
+                                               }
+                                               d.incTotalRotationStarted(1)
+                                               defer 
d.incTotalRotationFinished(1)
+                                               start := 
options.SegmentInterval.nextTime(t)
+                                               
d.logger.Info().Time("segment_start", start).Time("event_time", t).Msg("create 
new segment")
+                                               _, err = 
d.segmentController.create(start)
+                                               if err != nil {
+                                                       
d.logger.Error().Err(err).Msgf("failed to create new segment.")
+                                                       d.incTotalRotationErr(1)
+                                               }
+                                       }()
+                               }(ts)
+                       case <-idleCheckC:
+                               func() {
+                                       d.logger.Debug().Msg("checking for idle 
segments")
+                                       closedCount := 
d.segmentController.closeIdleSegments()
+                                       if closedCount > 0 {
+                                               d.logger.Info().Int("count", 
closedCount).Msg("closed idle segments")
                                        }
                                }()
-                       }(ts)
+                       }
                }
        }(rt)
        if rt == nil {
diff --git a/banyand/internal/storage/rotation_test.go 
b/banyand/internal/storage/rotation_test.go
index a0aed52e..43f292ef 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -81,8 +81,9 @@ func TestSegmentBoundaryUpdateFn(t *testing.T) {
                        t.Logf("current time: %s", ts.Format(time.RFC3339))
                        expected := i + 2
                        require.EventuallyWithTf(t, func(ct *assert.CollectT) {
-                               if len(segCtrl.segments()) != expected {
-                                       ct.Errorf("expect %d segments, got %d", 
expected, len(segCtrl.segments()))
+                               segments, _ := segCtrl.segments(false)
+                               if len(segments) != expected {
+                                       ct.Errorf("expect %d segments, got %d", 
expected, len(segments))
                                }
                        }, flags.EventuallyTimeout, time.Millisecond, "wait for 
%d segment to be created", expected)
                        ts = ts.Add(time.Hour)
@@ -111,7 +112,8 @@ func TestForwardRotation(t *testing.T) {
                t.Logf("current time: %s", ts.Format(time.RFC3339))
                tsdb.Tick(ts.UnixNano())
                assert.Eventually(t, func() bool {
-                       return len(segCtrl.segments()) == 2
+                       segments, _ := segCtrl.segments(false)
+                       return len(segments) == 2
                }, flags.EventuallyTimeout, time.Millisecond, "wait for the 
second segment to be created")
        })
 
@@ -122,7 +124,8 @@ func TestForwardRotation(t *testing.T) {
                t.Logf("current time: %s", ts.Format(time.RFC3339))
                tsdb.Tick(ts.UnixNano())
                assert.Never(t, func() bool {
-                       return len(segCtrl.segments()) == 2
+                       segments, _ := segCtrl.segments(false)
+                       return len(segments) == 2
                }, flags.NeverTimeout, time.Millisecond, "wait for the second 
segment never to be created")
        })
 }
@@ -140,8 +143,9 @@ func TestRetention(t *testing.T) {
                        tsdb.Tick(ts.UnixNano())
                        expected := i + 2
                        require.EventuallyWithTf(t, func(ct *assert.CollectT) {
-                               if len(segCtrl.segments()) != expected {
-                                       ct.Errorf("expect %d segments, got %d", 
expected, len(segCtrl.segments()))
+                               segments, _ := segCtrl.segments(false)
+                               if len(segments) != expected {
+                                       ct.Errorf("expect %d segments, got %d", 
expected, len(segments))
                                }
                        }, flags.EventuallyTimeout, time.Millisecond, "wait for 
%d segment to be created", expected)
                        // amend the time to the next day
@@ -151,7 +155,8 @@ func TestRetention(t *testing.T) {
                c.Set(ts)
                tsdb.Tick(ts.UnixNano())
                assert.Eventually(t, func() bool {
-                       return len(segCtrl.segments()) == 4
+                       segments, _ := segCtrl.segments(false)
+                       return len(segments) == 4
                }, flags.EventuallyTimeout, time.Millisecond, "wait for the 1st 
segment to be deleted")
        })
 
@@ -165,7 +170,7 @@ func TestRetention(t *testing.T) {
                        tsdb.Tick(ts.UnixNano())
                        ts = ts.Add(time.Hour)
                        require.EventuallyWithTf(t, func(ct *assert.CollectT) {
-                               ss := segCtrl.segments()
+                               ss, _ := segCtrl.segments(false)
                                defer func() {
                                        for i := range ss {
                                                ss[i].DecRef()
@@ -184,7 +189,7 @@ func TestRetention(t *testing.T) {
                        c.Set(ts)
                        tsdb.Tick(ts.UnixNano())
                        require.EventuallyWithTf(t, func(ct *assert.CollectT) {
-                               ss := segCtrl.segments()
+                               ss, _ := segCtrl.segments(false)
                                defer func() {
                                        for i := range ss {
                                                ss[i].DecRef()
@@ -233,7 +238,8 @@ func setUpDB(t *testing.T) (*database[*MockTSTable, any], 
timestamp.MockClock, *
        defer seg.DecRef()
 
        db := tsdb.(*database[*MockTSTable, any])
-       require.Equal(t, len(db.segmentController.segments()), 1)
+       segments, _ := db.segmentController.segments(false)
+       require.Equal(t, len(segments), 1)
        return db, mc, db.segmentController, tracker, func() {
                tsdb.Close()
                defFn()
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index b96da252..27bb2059 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -44,17 +44,21 @@ import (
 // ErrExpiredData is returned when the data is expired.
 var ErrExpiredData = errors.New("expired data")
 
+// ErrSegmentClosed is returned when trying to access a closed segment.
+var ErrSegmentClosed = errors.New("segment closed")
+
 type segment[T TSTable, O any] struct {
-       metrics  any
-       option   O
-       creator  TSTableCreator[T, O]
-       l        *logger.Logger
-       index    *seriesIndex
-       sLst     atomic.Pointer[[]*shard[T]]
-       position common.Position
+       metrics      any
+       tsdbOpts     *TSDBOpts[T, O]
+       l            *logger.Logger
+       index        *seriesIndex
+       sLst         atomic.Pointer[[]*shard[T]]
+       indexMetrics *inverted.Metrics
+       position     common.Position
        timestamp.TimeRange
        suffix        string
        location      string
+       lastAccessed  atomic.Int64
        mu            sync.Mutex
        refCount      int32
        mustBeDeleted uint32
@@ -74,24 +78,20 @@ func (sc *segmentController[T, O]) openSegment(ctx 
context.Context, startTime, e
        })
        options := sc.getOptions()
        id := generateSegID(options.SegmentInterval.Unit, suffixInteger)
-       sir, err := newSeriesIndex(ctx, path, 
options.SeriesIndexFlushTimeoutSeconds, options.SeriesIndexCacheMaxBytes, 
sc.indexMetrics)
-       if err != nil {
-               return nil, errors.Wrap(errOpenDatabase, 
errors.WithMessage(err, "create series index controller failed").Error())
-       }
+
        s = &segment[T, O]{
-               id:        id,
-               location:  path,
-               suffix:    suffix,
-               TimeRange: timestamp.NewSectionTimeRange(startTime, endTime),
-               position:  p,
-               refCount:  1,
-               index:     sir,
-               metrics:   sc.metrics,
-               creator:   options.TSTableCreator,
-               option:    options.Option,
+               id:           id,
+               location:     path,
+               suffix:       suffix,
+               TimeRange:    timestamp.NewSectionTimeRange(startTime, endTime),
+               position:     p,
+               metrics:      sc.metrics,
+               indexMetrics: sc.indexMetrics,
+               tsdbOpts:     options,
        }
        s.l = logger.Fetch(ctx, s.String())
-       return s, s.loadShards(int(options.ShardNum))
+       s.lastAccessed.Store(time.Now().UnixNano())
+       return s, s.initialize(ctx)
 }
 
 func (s *segment[T, O]) loadShards(shardNum int) error {
@@ -104,7 +104,7 @@ func (s *segment[T, O]) loadShards(shardNum int) error {
                        return nil
                }
                s.l.Info().Int("shard_id", shardID).Msg("loaded a existed 
shard")
-               _, err = s.CreateTSTableIfNotExist(common.ShardID(shardID))
+               _, err = s.createShardIfNotExist(common.ShardID(shardID))
                return err
        })
 }
@@ -123,31 +123,99 @@ func (s *segment[T, O]) Tables() (tt []T) {
        return tt
 }
 
-func (s *segment[T, O]) incRef() {
+func (s *segment[T, O]) incRef(ctx context.Context) error {
+       s.lastAccessed.Store(time.Now().UnixNano())
+       if atomic.LoadInt32(&s.refCount) <= 0 {
+               return s.initialize(ctx)
+       }
        atomic.AddInt32(&s.refCount, 1)
+       return nil
+}
+
+func (s *segment[T, O]) initialize(ctx context.Context) error {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+
+       if atomic.LoadInt32(&s.refCount) > 0 {
+               return nil
+       }
+
+       ctx = context.WithValue(ctx, logger.ContextKey, s.l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return s.position
+       })
+
+       sir, err := newSeriesIndex(ctx, s.location, 
s.tsdbOpts.SeriesIndexFlushTimeoutSeconds, s.tsdbOpts.SeriesIndexCacheMaxBytes, 
s.indexMetrics)
+       if err != nil {
+               return errors.Wrap(errOpenDatabase, errors.WithMessage(err, 
"create series index controller failed").Error())
+       }
+       s.index = sir
+
+       err = s.loadShards(int(s.tsdbOpts.ShardNum))
+       if err != nil {
+               s.index.Close()
+               s.index = nil
+               return errors.Wrap(errOpenDatabase, errors.WithMessage(err, 
"load shards failed").Error())
+       }
+       atomic.StoreInt32(&s.refCount, 1)
+
+       s.l.Info().Stringer("seg", s).Msg("segment initialized")
+       return nil
 }
 
 func (s *segment[T, O]) DecRef() {
-       n := atomic.AddInt32(&s.refCount, -1)
-       if n > 0 {
+       shouldCleanup := false
+
+       if atomic.LoadInt32(&s.refCount) <= 0 && 
atomic.LoadUint32(&s.mustBeDeleted) != 0 {
+               shouldCleanup = true
+       } else {
+               for {
+                       current := atomic.LoadInt32(&s.refCount)
+                       if current <= 0 {
+                               return
+                       }
+
+                       if atomic.CompareAndSwapInt32(&s.refCount, current, 
current-1) {
+                               shouldCleanup = current == 1
+                               break
+                       }
+               }
+       }
+
+       if !shouldCleanup {
                return
        }
+
+       s.performCleanup()
+}
+
+func (s *segment[T, O]) performCleanup() {
        s.mu.Lock()
        defer s.mu.Unlock()
 
+       if atomic.LoadInt32(&s.refCount) > 0 && 
atomic.LoadUint32(&s.mustBeDeleted) == 0 {
+               return
+       }
+
        deletePath := ""
        if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
                deletePath = s.location
        }
 
-       if err := s.index.Close(); err != nil {
-               s.l.Panic().Err(err).Msg("failed to close the series index")
+       if s.index != nil {
+               if err := s.index.Close(); err != nil {
+                       s.l.Panic().Err(err).Msg("failed to close the series 
index")
+               }
+               s.index = nil
        }
 
        sLst := s.sLst.Load()
        if sLst != nil {
-               for _, s := range *sLst {
-                       s.close()
+               for _, shard := range *sLst {
+                       shard.close()
+               }
+               if deletePath == "" {
+                       s.sLst.Store(&[]*shard[T]{})
                }
        }
 
@@ -167,6 +235,10 @@ func (s *segment[T, O]) CreateTSTableIfNotExist(id 
common.ShardID) (T, error) {
        }
        s.mu.Lock()
        defer s.mu.Unlock()
+       return s.createShardIfNotExist(id)
+}
+
+func (s *segment[T, O]) createShardIfNotExist(id common.ShardID) (T, error) {
        if s, ok := s.getShard(id); ok {
                return s.table, nil
        }
@@ -218,13 +290,14 @@ type segmentController[T TSTable, O any] struct {
        location                string
        lst                     []*segment[T, O]
        deadline                atomic.Int64
+       idleTimeout             time.Duration
        optsMutex               sync.RWMutex
        sync.RWMutex
 }
 
 func newSegmentController[T TSTable, O any](ctx context.Context, location 
string,
        l *logger.Logger, opts TSDBOpts[T, O], indexMetrics *inverted.Metrics, 
metrics Metrics,
-       segmentsBoundaryUpdateFn SegmentBoundaryUpdateFn,
+       segmentsBoundaryUpdateFn SegmentBoundaryUpdateFn, idleTimeout 
time.Duration,
 ) *segmentController[T, O] {
        clock, _ := timestamp.GetClock(ctx)
        p := common.GetPosition(ctx)
@@ -239,6 +312,7 @@ func newSegmentController[T TSTable, O any](ctx 
context.Context, location string
                segmentBoundaryUpdateFn: segmentsBoundaryUpdateFn,
                stage:                   p.Stage,
                db:                      p.Database,
+               idleTimeout:             idleTimeout,
        }
 }
 
@@ -261,21 +335,24 @@ func (sc *segmentController[T, O]) 
updateOptions(resourceOpts *commonv1.Resource
        sc.opts.ShardNum = resourceOpts.ShardNum
 }
 
-func (sc *segmentController[T, O]) selectSegments(timeRange 
timestamp.TimeRange) (tt []Segment[T, O]) {
+func (sc *segmentController[T, O]) selectSegments(timeRange 
timestamp.TimeRange) (tt []Segment[T, O], err error) {
        sc.RLock()
        defer sc.RUnlock()
        last := len(sc.lst) - 1
+       ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l)
        for i := range sc.lst {
                s := sc.lst[last-i]
                if s.GetTimeRange().End.Before(timeRange.Start) {
                        break
                }
                if s.Overlapping(timeRange) {
-                       s.incRef()
+                       if err = s.incRef(ctx); err != nil {
+                               return nil, err
+                       }
                        tt = append(tt, s)
                }
        }
-       return tt
+       return tt, nil
 }
 
 func (sc *segmentController[T, O]) createSegment(ts time.Time) (*segment[T, 
O], error) {
@@ -287,16 +364,16 @@ func (sc *segmentController[T, O]) createSegment(ts 
time.Time) (*segment[T, O],
        if err != nil {
                return nil, err
        }
-       s.incRef()
        sc.notifySegmentBoundaryUpdate()
-       return s, nil
+       return s, s.incRef(context.WithValue(context.Background(), 
logger.ContextKey, sc.l))
 }
 
 func (sc *segmentController[T, O]) notifySegmentBoundaryUpdate() {
        if sc.segmentBoundaryUpdateFn == nil {
                return
        }
-       segs := sc.segments()
+       // No error if we do not open closed segments.
+       segs, _ := sc.segments(false)
        defer func() {
                for i := range segs {
                        segs[i].DecRef()
@@ -312,15 +389,49 @@ func (sc *segmentController[T, O]) 
notifySegmentBoundaryUpdate() {
        sc.segmentBoundaryUpdateFn(sc.stage, sc.db, tr)
 }
 
-func (sc *segmentController[T, O]) segments() (ss []*segment[T, O]) {
+func (sc *segmentController[T, O]) segments(reopenClosed bool) (ss 
[]*segment[T, O], err error) {
        sc.RLock()
        defer sc.RUnlock()
        r := make([]*segment[T, O], len(sc.lst))
+       ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l)
        for i := range sc.lst {
-               sc.lst[i].incRef()
+               if reopenClosed {
+                       if err = sc.lst[i].incRef(ctx); err != nil {
+                               return nil, err
+                       }
+               } else {
+                       if atomic.LoadInt32(&sc.lst[i].refCount) > 0 {
+                               atomic.AddInt32(&sc.lst[i].refCount, 1)
+                       }
+               }
                r[i] = sc.lst[i]
        }
-       return r
+       return r, nil
+}
+
+func (sc *segmentController[T, O]) closeIdleSegments() int {
+       maxIdleTime := sc.idleTimeout
+
+       now := time.Now().UnixNano()
+       idleThreshold := now - maxIdleTime.Nanoseconds()
+
+       segs, _ := sc.segments(false)
+       closedCount := 0
+
+       for _, seg := range segs {
+               lastAccess := seg.lastAccessed.Load()
+               // Only consider segments that have been idle for longer than 
the threshold
+               // and have active references (are not already closed)
+               if lastAccess < idleThreshold && 
atomic.LoadInt32(&seg.refCount) > 0 {
+                       seg.DecRef()
+               }
+               seg.DecRef()
+               if atomic.LoadInt32(&seg.refCount) == 0 {
+                       closedCount++
+               }
+       }
+
+       return closedCount
 }
 
 func (sc *segmentController[T, O]) format(tm time.Time) string {
@@ -446,7 +557,8 @@ func (sc *segmentController[T, O]) load(start, end 
time.Time, root string) (seg
 }
 
 func (sc *segmentController[T, O]) remove(deadline time.Time) (hasSegment 
bool, err error) {
-       for _, s := range sc.segments() {
+       ss, _ := sc.segments(false)
+       for _, s := range ss {
                if s.Before(deadline) {
                        hasSegment = true
                        id := s.id
@@ -468,7 +580,8 @@ func (sc *segmentController[T, O]) 
getExpiredSegmentsTimeRange() *timestamp.Time
                IncludeStart: true,
                IncludeEnd:   false,
        }
-       for _, s := range sc.segments() {
+       ss, _ := sc.segments(false)
+       for _, s := range ss {
                if s.Before(deadline) {
                        if timeRange.Start.IsZero() {
                                timeRange.Start = s.Start
@@ -483,7 +596,8 @@ func (sc *segmentController[T, O]) 
getExpiredSegmentsTimeRange() *timestamp.Time
 func (sc *segmentController[T, O]) deleteExpiredSegments(timeRange 
timestamp.TimeRange) int64 {
        deadline := time.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
        var count int64
-       for _, s := range sc.segments() {
+       ss, _ := sc.segments(false)
+       for _, s := range ss {
                if s.Before(deadline) && s.Overlapping(timeRange) {
                        s.delete()
                        sc.Lock()
diff --git a/banyand/internal/storage/segment_test.go 
b/banyand/internal/storage/segment_test.go
new file mode 100644
index 00000000..094bf475
--- /dev/null
+++ b/banyand/internal/storage/segment_test.go
@@ -0,0 +1,481 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "path/filepath"
+       "strconv"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// mockTSTable is a minimal implementation of TSTable for testing.
+type mockTSTable struct {
+       ID common.ShardID
+}
+
+func (m mockTSTable) Close() error {
+       return nil
+}
+
+func (m mockTSTable) Collect(Metrics) {}
+
+func (m mockTSTable) TakeFileSnapshot(string) error { return nil }
+
+// mockTSTableOpener implements the necessary functions to open a TSTable.
+type mockTSTableOpener struct{}
+
+func setupTestEnvironment(t *testing.T) (string, func()) {
+       t.Helper()
+       tempDir := t.TempDir()
+
+       // Create test logger
+       err := logger.Init(logger.Logging{
+               Env:   "test",
+               Level: "info",
+       })
+       require.NoError(t, err)
+
+       return tempDir, func() {
+               // Cleanup function
+               os.RemoveAll(tempDir)
+       }
+}
+
+func TestSegmentOpenAndReopen(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-segment")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "test-stage",
+               }
+       })
+
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum: 2,
+               SegmentInterval: IntervalRule{
+                       Unit: DAY,
+                       Num:  1,
+               },
+               TTL: IntervalRule{
+                       Unit: DAY,
+                       Num:  7,
+               },
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,           // indexMetrics
+               nil,           // metrics
+               nil,           // segmentBoundaryUpdateFn
+               5*time.Minute, // idleTimeout
+       )
+
+       now := time.Now().UTC()
+       startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+       endTime := startTime.Add(24 * time.Hour)
+
+       // Create and open a segment
+       segmentPath := filepath.Join(tempDir, 
"segment-"+startTime.Format(dayFormat))
+       err := os.MkdirAll(segmentPath, DirPerm)
+       require.NoError(t, err)
+
+       // Write metadata file
+       metadataPath := filepath.Join(segmentPath, metadataFilename)
+       err = os.WriteFile(metadataPath, []byte(currentVersion), FilePerm)
+       require.NoError(t, err)
+
+       // Open segment
+       suffix := startTime.Format(dayFormat)
+       segment, err := sc.openSegment(ctx, startTime, endTime, segmentPath, 
suffix)
+       require.NoError(t, err)
+       require.NotNil(t, segment)
+
+       // Verify segment is open
+       assert.Greater(t, segment.refCount, int32(0))
+
+       // Close segment by decrementing reference count
+       initialRefCount := segment.refCount
+       segment.DecRef()
+
+       // Verify segment is closed (refCount reduced)
+       assert.Equal(t, initialRefCount-1, segment.refCount)
+
+       // Reopen segment
+       segment.incRef(ctx)
+
+       // Verify segment is properly reopened
+       assert.Equal(t, initialRefCount, segment.refCount)
+
+       // Verify we can still access segment data
+       assert.NotNil(t, segment.index)
+       assert.Equal(t, startTime, segment.Start)
+       assert.Equal(t, endTime, segment.End)
+
+       // Test that we can create a TSTable after reopening
+       table, err := segment.CreateTSTableIfNotExist(0)
+       require.NoError(t, err)
+       assert.Equal(t, common.ShardID(0), table.ID)
+}
+
+func TestSegmentCloseIfIdle(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-segment")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "test-stage",
+               }
+       })
+
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum: 2,
+               SegmentInterval: IntervalRule{
+                       Unit: DAY,
+                       Num:  1,
+               },
+               TTL: IntervalRule{
+                       Unit: DAY,
+                       Num:  7,
+               },
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,         // indexMetrics
+               nil,         // metrics
+               nil,         // segmentBoundaryUpdateFn
+               time.Second, // Set short idle timeout for testing
+       )
+
+       // Test time parameters
+       now := time.Now().UTC()
+       startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+       endTime := startTime.Add(24 * time.Hour)
+
+       // Create and open a segment
+       segmentPath := filepath.Join(tempDir, 
"segment-"+startTime.Format(dayFormat))
+       err := os.MkdirAll(segmentPath, DirPerm)
+       require.NoError(t, err)
+
+       // Write metadata file
+       metadataPath := filepath.Join(segmentPath, metadataFilename)
+       err = os.WriteFile(metadataPath, []byte(currentVersion), FilePerm)
+       require.NoError(t, err)
+
+       // Open segment
+       suffix := startTime.Format(dayFormat)
+       segment, err := sc.openSegment(ctx, startTime, endTime, segmentPath, 
suffix)
+       require.NoError(t, err)
+
+       // Force last access time to be in the past
+       segment.lastAccessed.Store(time.Now().Add(-time.Minute).UnixNano())
+
+       // Close if idle should succeed
+       segment.DecRef()
+
+       // Verify segment is closed
+       assert.Nil(t, segment.index)
+
+       // Test reopening the segment
+       segment.incRef(ctx)
+
+       // Verify segment is properly reopened
+       assert.NotNil(t, segment.index)
+       assert.Greater(t, segment.refCount, int32(0))
+}
+
+func TestCloseIdleAndSelectSegments(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-segment-controller")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "test-stage",
+               }
+       })
+
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum: 2,
+               SegmentInterval: IntervalRule{
+                       Unit: DAY,
+                       Num:  1,
+               },
+               TTL: IntervalRule{
+                       Unit: DAY,
+                       Num:  7,
+               },
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       // Create segment controller with a short idle timeout (100ms)
+       idleTimeout := 100 * time.Millisecond
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,         // indexMetrics
+               nil,         // metrics
+               nil,         // segmentBoundaryUpdateFn
+               idleTimeout, // short idle timeout
+       )
+
+       // Test time parameters
+       now := time.Now().UTC()
+       day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+       day2 := day1.Add(24 * time.Hour)
+       day3 := day2.Add(24 * time.Hour)
+
+       // Create multiple segments
+       createSegment := func(startTime time.Time) *segment[mockTSTable, 
mockTSTableOpener] {
+               segmentPath := filepath.Join(tempDir, 
"segment-"+startTime.Format(dayFormat))
+               err := os.MkdirAll(segmentPath, DirPerm)
+               require.NoError(t, err)
+
+               // Write metadata file
+               metadataPath := filepath.Join(segmentPath, metadataFilename)
+               err = os.WriteFile(metadataPath, []byte(currentVersion), 
FilePerm)
+               require.NoError(t, err)
+
+               // Open segment
+               suffix := startTime.Format(dayFormat)
+               segment, err := sc.openSegment(ctx, startTime, 
startTime.Add(24*time.Hour), segmentPath, suffix)
+               require.NoError(t, err)
+
+               // Add segment to controller's list
+               sc.Lock()
+               sc.lst = append(sc.lst, segment)
+               sc.sortLst()
+               sc.Unlock()
+
+               return segment
+       }
+
+       // Create three segments
+       seg1 := createSegment(day1)
+       seg2 := createSegment(day2)
+       seg3 := createSegment(day3)
+
+       // Verify we have three segments
+       require.Len(t, sc.lst, 3)
+
+       // Make sure all segments have reference counts > 0
+       require.Greater(t, seg1.refCount, int32(0))
+       require.Greater(t, seg2.refCount, int32(0))
+       require.Greater(t, seg3.refCount, int32(0))
+
+       // Force segments 1 and 3 to be idle (setting last accessed time in the 
past)
+       seg1.lastAccessed.Store(time.Now().Add(-time.Second).UnixNano())
+       seg3.lastAccessed.Store(time.Now().Add(-time.Second).UnixNano())
+
+       // Keep seg2 active
+       seg2.lastAccessed.Store(time.Now().UnixNano())
+
+       // Close idle segments
+       closedCount := sc.closeIdleSegments()
+
+       // We should have closed 2 segments (seg1 and seg3)
+       assert.Equal(t, 2, closedCount)
+
+       // Check that seg1 and seg3 are closed (index is nil)
+       assert.Nil(t, seg1.index)
+       assert.Nil(t, seg3.index)
+
+       // While seg2 remains open
+       assert.NotNil(t, seg2.index)
+
+       // Now select segments using the entire time range
+       timeRange := timestamp.NewInclusiveTimeRange(day1, 
day3.Add(24*time.Hour))
+       selectedSegments, err := sc.selectSegments(timeRange)
+       require.NoError(t, err)
+
+       // Should have selected all 3 segments
+       require.Len(t, selectedSegments, 3)
+
+       // Verify segments were reopened (they should have an index again)
+       for _, s := range selectedSegments {
+               seg := s.(*segment[mockTSTable, mockTSTableOpener])
+               assert.NotNil(t, seg.index, "Selected segment should be 
reopened with a valid index")
+               assert.Greater(t, seg.refCount, int32(0), "Selected segment 
should have positive reference count")
+               seg.DecRef() // Cleanup
+       }
+}
+
+func TestOpenExistingSegmentWithShards(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-segment-shards")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "test-stage",
+               }
+       })
+
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, location string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       shardID := common.ShardID(0)
+                       // Extract shard ID from the path
+                       if shardPath := filepath.Base(location); len(shardPath) 
> 6 {
+                               if id, err := strconv.Atoi(shardPath[6:]); err 
== nil {
+                                       shardID = common.ShardID(id)
+                               }
+                       }
+                       return mockTSTable{ID: shardID}, nil
+               },
+               ShardNum: 2,
+               SegmentInterval: IntervalRule{
+                       Unit: DAY,
+                       Num:  1,
+               },
+               TTL: IntervalRule{
+                       Unit: DAY,
+                       Num:  7,
+               },
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,           // indexMetrics
+               nil,           // metrics
+               nil,           // segmentBoundaryUpdateFn
+               5*time.Minute, // idleTimeout
+       )
+
+       // Test time parameters
+       now := time.Now().UTC()
+       startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+       endTime := startTime.Add(24 * time.Hour)
+
+       // Create segment directory
+       suffix := startTime.Format(dayFormat)
+       segmentPath := filepath.Join(tempDir, "segment-"+suffix)
+       err := os.MkdirAll(segmentPath, DirPerm)
+       require.NoError(t, err)
+
+       // Write metadata file
+       metadataPath := filepath.Join(segmentPath, metadataFilename)
+       err = os.WriteFile(metadataPath, []byte(currentVersion), FilePerm)
+       require.NoError(t, err)
+
+       // Create shard directories
+       for i := 0; i < int(opts.ShardNum); i++ {
+               shardPath := filepath.Join(segmentPath, fmt.Sprintf("shard-%d", 
i))
+               err = os.MkdirAll(shardPath, DirPerm)
+               require.NoError(t, err)
+
+               // Add a metadata file to each shard
+               shardMetadataPath := filepath.Join(shardPath, metadataFilename)
+               err = os.WriteFile(shardMetadataPath, []byte(currentVersion), 
FilePerm)
+               require.NoError(t, err)
+       }
+
+       // Open the segment
+       segment, err := sc.openSegment(ctx, startTime, endTime, segmentPath, 
suffix)
+       require.NoError(t, err)
+       require.NotNil(t, segment)
+
+       // Verify both shards were loaded
+       shardList := segment.sLst.Load()
+       require.NotNil(t, shardList, "Shard list should not be nil")
+       require.Len(t, *shardList, 2, "Segment should have loaded two shards")
+
+       // Verify shard IDs
+       shardIDs := make([]common.ShardID, 2)
+       for i, shard := range *shardList {
+               shardIDs[i] = shard.id
+       }
+       assert.Contains(t, shardIDs, common.ShardID(0), "Shard 0 should be 
loaded")
+       assert.Contains(t, shardIDs, common.ShardID(1), "Shard 1 should be 
loaded")
+
+       // Verify tables can be retrieved
+       tables := segment.Tables()
+       require.Len(t, tables, 2, "Should have 2 tables")
+
+       // Make sure each table has the correct ID
+       tableIDs := make([]common.ShardID, 2)
+       for i, table := range tables {
+               tableIDs[i] = table.ID
+       }
+       assert.Contains(t, tableIDs, common.ShardID(0), "Table for shard 0 
should exist")
+       assert.Contains(t, tableIDs, common.ShardID(1), "Table for shard 1 
should exist")
+
+       // Clean up
+       segment.DecRef()
+}
diff --git a/banyand/internal/storage/shard.go 
b/banyand/internal/storage/shard.go
index a75db393..6d1c576f 100644
--- a/banyand/internal/storage/shard.go
+++ b/banyand/internal/storage/shard.go
@@ -43,7 +43,7 @@ func (s *segment[T, O]) openShard(ctx context.Context, id 
common.ShardID) (*shar
        l.Info().Int("shard_id", int(id)).Str("path", location).Msg("loading a 
shard")
        p := common.GetPosition(ctx)
        p.Shard = strconv.Itoa(int(id))
-       t, err := s.creator(lfs, location, p, l, s.TimeRange, s.option, 
s.metrics)
+       t, err := s.tsdbOpts.TSTableCreator(lfs, location, p, l, s.TimeRange, 
s.tsdbOpts.Option, s.metrics)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index 3e8c148a..1367e367 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -105,7 +105,7 @@ type IndexDB interface {
 type TSDB[T TSTable, O any] interface {
        io.Closer
        CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error)
-       SelectSegments(timeRange timestamp.TimeRange) []Segment[T, O]
+       SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error)
        Tick(ts int64)
        UpdateOptions(opts *commonv1.ResourceOpts)
        TakeFileSnapshot(dst string) error
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index e3ac32cc..eca40aa7 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -67,6 +67,7 @@ type TSDBOpts[T TSTable, O any] struct {
        SeriesIndexCacheMaxBytes       int
        ShardNum                       uint32
        DisableRetention               bool
+       SegmentIdleTimeout             time.Duration
 }
 
 type (
@@ -136,8 +137,8 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts 
TSDBOpts[T, O]) (TSDB[
                logger:    l,
                tsEventCh: make(chan int64),
                p:         p,
-               segmentController: newSegmentController[T](ctx, location,
-                       l, opts, indexMetrics, opts.TableMetrics, 
opts.SegmentBoundaryUpdateFn),
+               segmentController: newSegmentController(ctx, location,
+                       l, opts, indexMetrics, opts.TableMetrics, 
opts.SegmentBoundaryUpdateFn, opts.SegmentIdleTimeout),
                metrics:          newMetrics(opts.StorageMetricsFactory),
                disableRetention: opts.DisableRetention,
        }
@@ -162,9 +163,9 @@ func (d *database[T, O]) CreateSegmentIfNotExist(ts 
time.Time) (Segment[T, O], e
        return d.segmentController.createSegment(ts)
 }
 
-func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange) 
[]Segment[T, O] {
+func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange) 
([]Segment[T, O], error) {
        if d.closed.Load() {
-               return nil
+               return nil, nil
        }
        return d.segmentController.selectSegments(timeRange)
 }
@@ -181,7 +182,10 @@ func (d *database[T, O]) TakeFileSnapshot(dst string) 
error {
                return errors.New("database is closed")
        }
 
-       segments := d.segmentController.segments()
+       segments, err := d.segmentController.segments(true)
+       if err != nil {
+               return errors.Wrap(err, "failed to get segments")
+       }
        defer func() {
                for _, seg := range segments {
                        seg.DecRef()
@@ -239,7 +243,7 @@ func (d *database[T, O]) collect() {
        }
        d.metrics.lastTickTime.Set(float64(d.latestTickTime.Load()))
        refCount := int32(0)
-       ss := d.segmentController.segments()
+       ss, _ := d.segmentController.segments(false)
        for _, s := range ss {
                for _, t := range s.Tables() {
                        t.Collect(d.segmentController.metrics)
diff --git a/banyand/internal/storage/tsdb_test.go 
b/banyand/internal/storage/tsdb_test.go
index dd96807d..ba5eb623 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -66,7 +66,8 @@ func TestOpenTSDB(t *testing.T) {
                defer seg.DecRef()
 
                db := tsdb.(*database[*MockTSTable, any])
-               require.Equal(t, len(db.segmentController.segments()), 1)
+               ss, _ := db.segmentController.segments(false)
+               require.Equal(t, len(ss), 1)
                tsdb.Close()
        })
 
@@ -99,7 +100,7 @@ func TestOpenTSDB(t *testing.T) {
                seg.DecRef()
 
                db := tsdb.(*database[*MockTSTable, any])
-               segs := db.segmentController.segments()
+               segs, _ := db.segmentController.segments(false)
                require.Equal(t, len(segs), 1)
                for i := range segs {
                        segs[i].DecRef()
@@ -112,7 +113,7 @@ func TestOpenTSDB(t *testing.T) {
                require.NotNil(t, tsdb)
 
                db = tsdb.(*database[*MockTSTable, any])
-               segs = db.segmentController.segments()
+               segs, _ = db.segmentController.segments(false)
                require.Equal(t, len(segs), 1)
                for i := range segs {
                        segs[i].DecRef()
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index 7eac7940..b85fd472 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -19,6 +19,7 @@ package grpc
 
 import (
        "fmt"
+       "strings"
        "sync"
 
        "github.com/pkg/errors"
@@ -162,14 +163,35 @@ func (s *groupRepo) shardNum(groupName string) (uint32, 
bool) {
        return r.ShardNum, true
 }
 
-func (s *groupRepo) getNodeSelector(groupName string) (string, bool) {
+func (s *groupRepo) getNodeSelector(groupName string, stages []string) 
([]string, bool) {
        s.RWMutex.RLock()
        defer s.RWMutex.RUnlock()
        r, ok := s.resourceOpts[groupName]
        if !ok {
-               return "", false
+               return nil, false
        }
-       return r.DefaultNodeSelector, true
+       if len(stages) == 0 {
+               stages = r.DefaultStages
+       }
+       if len(stages) == 0 {
+               return nil, false
+       }
+
+       var nodeSelectors []string
+       for _, stage := range r.Stages {
+               for _, sn := range stages {
+                       if strings.EqualFold(sn, stage.Name) {
+                               ns := stage.NodeSelector
+                               ns = strings.TrimSpace(ns)
+                               if ns == "" {
+                                       continue
+                               }
+                               nodeSelectors = append(nodeSelectors, ns)
+                               break
+                       }
+               }
+       }
+       return nodeSelectors, true
 }
 
 func getID(metadata *commonv1.Metadata) identity {
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index a6761208..15c72fa5 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -215,17 +215,13 @@ func (ms *measureService) Query(ctx context.Context, req 
*measurev1.QueryRequest
                        span.Stop()
                }()
        }
-       nodeSelectors := make(map[string]string)
+       nodeSelectors := make(map[string][]string)
        for _, g := range req.Groups {
-               if req.NodeSelector != "" {
-                       nodeSelectors[g] = req.NodeSelector
-                       continue
-               }
-               if ns, exist := ms.groupRepo.getNodeSelector(g); exist {
+               if ns, exist := ms.groupRepo.getNodeSelector(g, req.Stages); 
exist {
                        nodeSelectors[g] = ns
-                       continue
+               } else {
+                       nodeSelectors[g] = nil
                }
-               nodeSelectors[g] = ""
        }
 
        feat, err := ms.broadcaster.Publish(ctx, data.TopicMeasureQuery, 
bus.NewMessageWithNodeSelectors(bus.MessageID(now.UnixNano()), nodeSelectors, 
req.TimeRange, req))
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 0bbbef5c..b1920878 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -213,17 +213,13 @@ func (s *streamService) Query(ctx context.Context, req 
*streamv1.QueryRequest) (
                        span.Stop()
                }()
        }
-       nodeSelectors := make(map[string]string)
+       nodeSelectors := make(map[string][]string)
        for _, g := range req.Groups {
-               if req.NodeSelector != "" {
-                       nodeSelectors[g] = req.NodeSelector
-                       continue
-               }
-               if ns, exist := s.groupRepo.getNodeSelector(g); exist {
+               if ns, exist := s.groupRepo.getNodeSelector(g, req.Stages); 
exist {
                        nodeSelectors[g] = ns
-                       continue
+               } else {
+                       nodeSelectors[g] = nil
                }
-               nodeSelectors[g] = ""
        }
        message := 
bus.NewMessageWithNodeSelectors(bus.MessageID(now.UnixNano()), nodeSelectors, 
req.TimeRange, req)
        feat, errQuery := s.broadcaster.Publish(ctx, data.TopicStreamQuery, 
message)
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index c9a5f168..85560df8 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -384,8 +384,8 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error
        shardNum := ro.ShardNum
        ttl := ro.Ttl
        segInterval := ro.SegmentInterval
+       segmentIdleTimeout := time.Duration(0)
        if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
-               matched := false
                var ttlNum uint32
                for _, st := range ro.Stages {
                        if st.Ttl.Unit != ro.Ttl.Unit {
@@ -399,14 +399,14 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error
                        if !selector.Matches(s.nodeLabels) {
                                continue
                        }
-                       matched = true
+                       ttl.Num += ttlNum
                        shardNum = st.ShardNum
                        segInterval = st.SegmentInterval
+                       if st.Close {
+                               segmentIdleTimeout = 5 * time.Minute
+                       }
                        break
                }
-               if matched {
-                       ttl.Num += ttlNum
-               }
        }
        opts := storage.TSDBOpts[*tsTable, option]{
                ShardNum:                       shardNum,
@@ -420,6 +420,7 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error
                SeriesIndexCacheMaxBytes:       
int(s.option.seriesCacheMaxSize),
                StorageMetricsFactory:          factory,
                SegmentBoundaryUpdateFn:        
s.metadata.UpdateSegmentsBoundary,
+               SegmentIdleTimeout:             segmentIdleTimeout,
        }
        return storage.OpenTSDB(
                common.SetPosition(context.Background(), func(_ 
common.Position) common.Position {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 9fc92260..27b1ff0f 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -89,7 +89,10 @@ func (s *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
                tsdb = db.(storage.TSDB[*tsTable, option])
        }
 
-       segments := tsdb.SelectSegments(*mqo.TimeRange)
+       segments, err := tsdb.SelectSegments(*mqo.TimeRange)
+       if err != nil {
+               return nil, err
+       }
        if len(segments) < 1 {
                return nilResult, nil
        }
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 8d32d6a0..d49baeef 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -87,7 +87,9 @@ func (p *pub) Serve() run.StopNotify {
        return p.closer.CloseNotify()
 }
 
-func bypassMatches(_ map[string]string) bool { return true }
+var bypassMatches = []MatchFunc{bypassMatch}
+
+func bypassMatch(_ map[string]string) bool { return true }
 
 func (p *pub) Broadcast(timeout time.Duration, topic bus.Topic, messages 
bus.Message) ([]bus.Future, error) {
        var nodes []*databasev1.Node
@@ -106,23 +108,28 @@ func (p *pub) Broadcast(timeout time.Duration, topic 
bus.Topic, messages bus.Mes
                }
        } else {
                for g, sel := range messages.NodeSelectors() {
-                       var matches func(map[string]string) bool
-                       if sel == "" {
+                       var matches []MatchFunc
+                       if sel == nil {
                                matches = bypassMatches
                        } else {
-                               selector, err := ParseLabelSelector(sel)
-                               if err != nil {
-                                       return nil, fmt.Errorf("failed to parse 
node selector: %w", err)
+                               for _, s := range sel {
+                                       selector, err := ParseLabelSelector(s)
+                                       if err != nil {
+                                               return nil, fmt.Errorf("failed 
to parse node selector: %w", err)
+                                       }
+                                       matches = append(matches, 
selector.Matches)
                                }
-                               matches = selector.Matches
                        }
                        for _, n := range nodes {
                                tr := metadata.FindSegmentsBoundary(n, g)
                                if tr == nil {
                                        continue
                                }
-                               if matches(n.Labels) && 
timestamp.PbHasOverlap(messages.TimeRange(), tr) {
-                                       names[n.Metadata.Name] = struct{}{}
+                               for _, m := range matches {
+                                       if m(n.Labels) && 
timestamp.PbHasOverlap(messages.TimeRange(), tr) {
+                                               names[n.Metadata.Name] = 
struct{}{}
+                                               break
+                                       }
                                }
                        }
                }
diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go
index 9abb59f2..5bd525e8 100644
--- a/banyand/queue/pub/pub_test.go
+++ b/banyand/queue/pub/pub_test.go
@@ -327,8 +327,8 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        node2 := getDataNodeWithLabels("node2", addr2, 
node2Labels, node2Boundaries)
                        p.OnAddOrUpdate(node2)
 
-                       nodeSelectors := map[string]string{
-                               group1: "",
+                       nodeSelectors := map[string][]string{
+                               group1: {""},
                        }
 
                        ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery,
@@ -384,8 +384,8 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        node2 := getDataNodeWithLabels("node2", addr2, 
node2Labels, node2Boundaries)
                        p.OnAddOrUpdate(node2)
 
-                       nodeSelectors := map[string]string{
-                               group1: "role=ingest",
+                       nodeSelectors := map[string][]string{
+                               group1: {"role=ingest"},
                        }
 
                        ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery,
@@ -459,8 +459,8 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                                End:   timestamppb.New(now.Add(-1 * 
time.Hour).Add(-30 * time.Minute)),
                        }
 
-                       nodeSelectors := map[string]string{
-                               group1: "",
+                       nodeSelectors := map[string][]string{
+                               group1: {""},
                        }
 
                        ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery,
@@ -553,8 +553,8 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                                End:   timestamppb.New(now.Add(30 * 
time.Minute)),
                        }
 
-                       nodeSelectors := map[string]string{
-                               group1: "env=prod",
+                       nodeSelectors := map[string][]string{
+                               group1: {"env=prod"},
                        }
 
                        ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery,
diff --git a/banyand/queue/pub/selector.go b/banyand/queue/pub/selector.go
index fb9b741e..fc277435 100644
--- a/banyand/queue/pub/selector.go
+++ b/banyand/queue/pub/selector.go
@@ -29,6 +29,9 @@ type LabelSelector struct {
        criteria []condition
 }
 
+// MatchFunc is a function that matches labels.
+type MatchFunc func(labels map[string]string) bool
+
 type condition struct {
        Key    string
        Values []string
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 4891b373..55624e6f 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -296,8 +296,8 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error
        shardNum := ro.ShardNum
        ttl := ro.Ttl
        segInterval := ro.SegmentInterval
+       segmentIdleTimeout := time.Duration(0)
        if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
-               matched := false
                var ttlNum uint32
                for _, st := range ro.Stages {
                        if st.Ttl.Unit != ro.Ttl.Unit {
@@ -311,14 +311,14 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error
                        if !selector.Matches(s.nodeLabels) {
                                continue
                        }
-                       matched = true
+                       ttl.Num += ttlNum
                        shardNum = st.ShardNum
                        segInterval = st.SegmentInterval
+                       if st.Close {
+                               segmentIdleTimeout = 5 * time.Minute
+                       }
                        break
                }
-               if matched {
-                       ttl.Num += ttlNum
-               }
        }
        opts := storage.TSDBOpts[*tsTable, option]{
                ShardNum:                       shardNum,
@@ -332,6 +332,7 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error
                SeriesIndexCacheMaxBytes:       
int(s.option.seriesCacheMaxSize),
                StorageMetricsFactory:          
s.omr.With(storageScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(), 
p.DBLabelValues()))),
                SegmentBoundaryUpdateFn:        
s.metadata.UpdateSegmentsBoundary,
+               SegmentIdleTimeout:             segmentIdleTimeout,
        }
        return storage.OpenTSDB(
                common.SetPosition(context.Background(), func(_ 
common.Position) common.Position {
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 0308a278..8d195e62 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -43,15 +43,55 @@ import (
 const checkDoneEvery = 128
 
 func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr 
model.StreamQueryResult, err error) {
+       if err = validateQueryInput(sqo); err != nil {
+               return nil, err
+       }
+
+       tsdb, err := s.getTSDB()
+       if err != nil {
+               return nil, err
+       }
+
+       segments, err := tsdb.SelectSegments(*sqo.TimeRange)
+       if err != nil {
+               return nil, err
+       }
+       if len(segments) < 1 {
+               return bypassQueryResultInstance, nil
+       }
+
+       defer func() {
+               if err != nil {
+                       sqr.Release()
+               }
+       }()
+
+       series := prepareSeriesData(sqo)
+       qo := prepareQueryOptions(sqo)
+       tr := index.NewIntRangeOpts(qo.minTimestamp, qo.maxTimestamp, true, 
true)
+
+       if sqo.Order == nil || sqo.Order.Index == nil {
+               return s.executeTimeSeriesQuery(segments, series, qo, &tr), nil
+       }
+
+       return s.executeIndexedQuery(ctx, segments, series, sqo, &tr)
+}
+
+func validateQueryInput(sqo model.StreamQueryOptions) error {
        if sqo.TimeRange == nil || len(sqo.Entities) < 1 {
-               return nil, errors.New("invalid query options: timeRange and 
series are required")
+               return errors.New("invalid query options: timeRange and series 
are required")
        }
        if len(sqo.TagProjection) == 0 {
-               return nil, errors.New("invalid query options: tagProjection is 
required")
+               return errors.New("invalid query options: tagProjection is 
required")
        }
+       return nil
+}
+
+func (s *stream) getTSDB() (storage.TSDB[*tsTable, option], error) {
        var tsdb storage.TSDB[*tsTable, option]
        db := s.tsdb.Load()
        if db == nil {
+               var err error
                tsdb, err = s.schemaRepo.loadTSDB(s.group)
                if err != nil {
                        return nil, err
@@ -60,15 +100,10 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
        } else {
                tsdb = db.(storage.TSDB[*tsTable, option])
        }
-       segments := tsdb.SelectSegments(*sqo.TimeRange)
-       if len(segments) < 1 {
-               return bypassQueryResultInstance, nil
-       }
-       defer func() {
-               if err != nil {
-                       sqr.Release()
-               }
-       }()
+       return tsdb, nil
+}
+
+func prepareSeriesData(sqo model.StreamQueryOptions) []*pbv1.Series {
        series := make([]*pbv1.Series, len(sqo.Entities))
        for i := range sqo.Entities {
                series[i] = &pbv1.Series{
@@ -76,30 +111,90 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
                        EntityValues: sqo.Entities[i],
                }
        }
-       qo := queryOptions{
+       return series
+}
+
+func prepareQueryOptions(sqo model.StreamQueryOptions) queryOptions {
+       return queryOptions{
                StreamQueryOptions: sqo,
                minTimestamp:       sqo.TimeRange.Start.UnixNano(),
                maxTimestamp:       sqo.TimeRange.End.UnixNano(),
        }
-       tr := index.NewIntRangeOpts(qo.minTimestamp, qo.maxTimestamp, true, 
true)
+}
 
-       if sqo.Order == nil || sqo.Order.Index == nil {
-               result := &tsResult{
-                       segments: segments,
-                       series:   series,
-                       qo:       qo,
-                       sm:       s,
-                       pm:       s.pm,
-                       l:        s.l,
-                       tr:       &tr,
-               }
-               if sqo.Order == nil {
-                       result.asc = true
-               } else if sqo.Order.Sort == modelv1.Sort_SORT_ASC || 
sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED {
-                       result.asc = true
-               }
-               return result, nil
+func (s *stream) executeTimeSeriesQuery(
+       segments []storage.Segment[*tsTable, option],
+       series []*pbv1.Series,
+       qo queryOptions,
+       tr *index.RangeOpts,
+) model.StreamQueryResult {
+       result := &tsResult{
+               segments: segments,
+               series:   series,
+               qo:       qo,
+               sm:       s,
+               pm:       s.pm,
+               l:        s.l,
+               tr:       tr,
+       }
+
+       // Determine ascending order
+       if qo.Order == nil {
+               result.asc = true
+       } else if qo.Order.Sort == modelv1.Sort_SORT_ASC || qo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
+               result.asc = true
+       }
+
+       return result
+}
+
+func (s *stream) executeIndexedQuery(
+       ctx context.Context,
+       segments []storage.Segment[*tsTable, option],
+       series []*pbv1.Series,
+       sqo model.StreamQueryOptions,
+       tr *index.RangeOpts,
+) (model.StreamQueryResult, error) {
+       result, seriesFilter, resultTS, err := 
s.processSegmentsAndBuildFilters(ctx, segments, series, sqo, tr)
+       if err != nil {
+               return nil, err
        }
+
+       if seriesFilter.IsEmpty() {
+               result.Release()
+               return nil, nil
+       }
+
+       // Update time range if needed
+       sids := seriesFilter.ToSlice()
+       startTS := sqo.TimeRange.Start.UnixNano()
+       endTS := sqo.TimeRange.End.UnixNano()
+       minTS, maxTS := updateTimeRange(resultTS, startTS, endTS)
+       if minTS > startTS || maxTS < endTS {
+               newTR := timestamp.NewTimeRange(time.Unix(0, minTS), 
time.Unix(0, maxTS), sqo.TimeRange.IncludeStart, sqo.TimeRange.IncludeEnd)
+               sqo.TimeRange = &newTR
+       }
+
+       // Perform index-based sorting
+       if result.sortingIter, err = s.indexSort(ctx, sqo, result.tabs, sids); 
err != nil {
+               return nil, err
+       }
+
+       // Set ascending flag
+       if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
+               result.asc = true
+       }
+
+       return &result, nil
+}
+
+func (s *stream) processSegmentsAndBuildFilters(
+       ctx context.Context,
+       segments []storage.Segment[*tsTable, option],
+       series []*pbv1.Series,
+       sqo model.StreamQueryOptions,
+       tr *index.RangeOpts,
+) (idxResult, posting.List, posting.List, error) {
        var result idxResult
        result.pm = s.pm
        result.segments = segments
@@ -108,32 +203,39 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
                StreamQueryOptions: sqo,
                seriesToEntity:     
make(map[common.SeriesID][]*modelv1.TagValue),
        }
-       var sl pbv1.SeriesList
+
        seriesFilter := roaring.NewPostingList()
        var resultTS posting.List
+       var sl pbv1.SeriesList
+       var err error
+
        for i := range result.segments {
                sl, err = result.segments[i].Lookup(ctx, series)
                if err != nil {
-                       return nil, err
+                       return result, nil, nil, err
                }
+
                var filter, filterTS posting.List
-               if filter, filterTS, err = indexSearch(ctx, sqo, 
segments[i].Tables(), sl.ToList().ToSlice(), &tr); err != nil {
-                       return nil, err
+               if filter, filterTS, err = indexSearch(ctx, sqo, 
segments[i].Tables(), sl.ToList().ToSlice(), tr); err != nil {
+                       return result, nil, nil, err
                }
+
                if filter != nil && filter.IsEmpty() {
                        continue
                }
+
                if result.qo.elementFilter == nil {
                        result.qo.elementFilter = filter
                        resultTS = filterTS
                } else {
                        if err = result.qo.elementFilter.Union(filter); err != 
nil {
-                               return nil, err
+                               return result, nil, nil, err
                        }
                        if err = resultTS.Union(filterTS); err != nil {
-                               return nil, err
+                               return result, nil, nil, err
                        }
                }
+
                for j := range sl {
                        if seriesFilter.Contains(uint64(sl[j].ID)) {
                                continue
@@ -141,28 +243,11 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
                        seriesFilter.Insert(uint64(sl[j].ID))
                        result.qo.seriesToEntity[sl[j].ID] = sl[j].EntityValues
                }
+
                result.tabs = append(result.tabs, 
result.segments[i].Tables()...)
        }
 
-       if seriesFilter.IsEmpty() {
-               result.Release()
-               return nil, nil
-       }
-       sids := seriesFilter.ToSlice()
-       startTS := sqo.TimeRange.Start.UnixNano()
-       endTS := sqo.TimeRange.End.UnixNano()
-       minTS, maxTS := updateTimeRange(resultTS, startTS, endTS)
-       if minTS > startTS || maxTS < endTS {
-               newTR := timestamp.NewTimeRange(time.Unix(0, minTS), 
time.Unix(0, maxTS), sqo.TimeRange.IncludeStart, sqo.TimeRange.IncludeEnd)
-               sqo.TimeRange = &newTR
-       }
-       if result.sortingIter, err = s.indexSort(ctx, sqo, result.tabs, sids); 
err != nil {
-               return nil, err
-       }
-       if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
-               result.asc = true
-       }
-       return &result, nil
+       return result, seriesFilter, resultTS, nil
 }
 
 type queryOptions struct {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 9e13a442..c90c4b39 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -481,7 +481,7 @@ Metadata is for multi-tenant, multi-model use
 | segment_interval | [IntervalRule](#banyandb-common-v1-IntervalRule) |  | 
segment_interval indicates the length of a segment |
 | ttl | [IntervalRule](#banyandb-common-v1-IntervalRule) |  | ttl indicates 
time to live, how long the data will be cached |
 | stages | [LifecycleStage](#banyandb-common-v1-LifecycleStage) | repeated | 
stages defines the ordered lifecycle stages. Data progresses through these 
stages sequentially. |
-| default_node_selector | [string](#string) |  | default_node_selector is the 
default node selector for queries if node_selector is not specified |
+| default_stages | [string](#string) | repeated | default_stages is the name 
of the default stage |
 
 
 
@@ -2961,7 +2961,7 @@ QueryRequest is the request contract for query.
 | limit | [uint32](#uint32) |  | limit is used to impose a boundary on the 
number of records being returned. If top is specified, limit processes the 
dataset based on top&#39;s output |
 | order_by | [banyandb.model.v1.QueryOrder](#banyandb-model-v1-QueryOrder) |  
| order_by is given to specify the sort for a tag. |
 | trace | [bool](#bool) |  | trace is used to enable trace for the query |
-| node_selector | [string](#string) |  | node_selector is used to specify the 
target node for the query |
+| stages | [string](#string) | repeated | stages is used to specify the stage 
of the data points in the lifecycle |
 
 
 
@@ -3471,7 +3471,6 @@ QueryRequest is the request contract for query.
 | tag_projection | [string](#string) | repeated | tag_projection can be used 
to select tags of the data points in the response |
 | limit | [uint32](#uint32) |  |  |
 | trace | [bool](#bool) |  | trace is used to enable trace for the query |
-| node_selector | [string](#string) |  | node_selector is used to select the 
node to query |
 
 
 
@@ -3571,7 +3570,7 @@ QueryRequest is the request contract for query.
 | criteria | [banyandb.model.v1.Criteria](#banyandb-model-v1-Criteria) |  | 
tag_families are indexed. |
 | projection | 
[banyandb.model.v1.TagProjection](#banyandb-model-v1-TagProjection) |  | 
projection can be used to select the key names of the element in the response |
 | trace | [bool](#bool) |  | trace is used to enable trace for the query |
-| node_selector | [string](#string) |  | node_selector is used to select the 
node to query |
+| stages | [string](#string) | repeated | stage is used to specify the stage 
of the query in the lifecycle |
 
 
 
diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go
index ccbd390a..745000d0 100644
--- a/pkg/bus/bus.go
+++ b/pkg/bus/bus.go
@@ -48,7 +48,7 @@ type (
 // Message is send on the bus to all subscribed listeners.
 type Message struct {
        payload       payload
-       nodeSelectors map[string]string
+       nodeSelectors map[string][]string
        timeRange     *modelv1.TimeRange
        node          string
        id            MessageID
@@ -71,7 +71,7 @@ func (m Message) Node() string {
 }
 
 // NodeSelectors returns the node selectors of the Message.
-func (m Message) NodeSelectors() map[string]string {
+func (m Message) NodeSelectors() map[string][]string {
        return m.nodeSelectors
 }
 
@@ -102,7 +102,7 @@ func NewMessageWithNode(id MessageID, node string, data 
interface{}) Message {
 
 // NewMessageWithNodeSelectors returns a new Message with a MessageID and 
NodeSelectors and embed data.
 // Nodes matching any of the selectors will receive the message.
-func NewMessageWithNodeSelectors(id MessageID, nodeSelectors 
map[string]string, timeRange *modelv1.TimeRange, data interface{}) Message {
+func NewMessageWithNodeSelectors(id MessageID, nodeSelectors 
map[string][]string, timeRange *modelv1.TimeRange, data interface{}) Message {
        return Message{id: id, nodeSelectors: nodeSelectors, timeRange: 
timeRange, payload: data}
 }
 

Reply via email to