This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new d5f5dbfc Change the storage layer (#492)
d5f5dbfc is described below

commit d5f5dbfc0b6abf93f468321c57c193a2d74369e9
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Mon Jul 22 21:04:52 2024 +0800

    Change the storage layer (#492)
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                                |   2 +
 banyand/internal/storage/README.md        |   3 +-
 banyand/internal/storage/index.go         | 251 ++----------------------------
 banyand/internal/storage/index_test.go    |  59 +------
 banyand/internal/storage/rotation.go      |  64 +++-----
 banyand/internal/storage/rotation_test.go |  26 +---
 banyand/internal/storage/segment.go       | 202 +++++++++++++++---------
 banyand/internal/storage/shard.go         |  57 ++++---
 banyand/internal/storage/storage.go       |  22 +--
 banyand/internal/storage/tsdb.go          | 143 +++--------------
 banyand/measure/datapoints.go             |   6 +-
 banyand/measure/query.go                  | 143 ++++++++++-------
 banyand/measure/write.go                  |  55 +++++--
 banyand/stream/benchmark_test.go          |  51 ++----
 banyand/stream/elements.go                |   6 +-
 banyand/stream/query.go                   |  54 ++++---
 banyand/stream/query_test.go              |  19 +--
 banyand/stream/write.go                   |  39 +++--
 18 files changed, 439 insertions(+), 763 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d3362043..58b59f87 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,8 @@ Release Notes.
 ### File System Changes
 
 - Bump up the version of the file system to 1.1.0 which is not compatible with 
the previous version.
+- Move the series index into segment.
+- Swap the segment and the shard.
 
 ### Features
 
diff --git a/banyand/internal/storage/README.md 
b/banyand/internal/storage/README.md
index 77f177c6..f55c26b7 100644
--- a/banyand/internal/storage/README.md
+++ b/banyand/internal/storage/README.md
@@ -1,2 +1,3 @@
 # File Version Compatibility Information
-The current file version is 1.0.0, compatible with versions specified in 
version.yml.
+
+The current file version is 1.1.0, compatible with versions specified in 
version.yml.
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index d5c935fd..f3fa2f04 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -19,14 +19,7 @@ package storage
 
 import (
        "context"
-       "fmt"
        "path"
-       "path/filepath"
-       "sort"
-       "strconv"
-       "strings"
-       "sync"
-       "time"
 
        "github.com/pkg/errors"
        "go.uber.org/multierr"
@@ -40,33 +33,28 @@ import (
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
-       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-func (d *database[T, O]) IndexDB() IndexDB {
-       return d.indexController.getHot()
+func (s *segment[T, O]) IndexDB() IndexDB {
+       return s.index
 }
 
-func (d *database[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) 
(pbv1.SeriesList, error) {
-       return d.indexController.searchPrimary(ctx, series)
+func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) 
(pbv1.SeriesList, error) {
+       return s.index.searchPrimary(ctx, series)
 }
 
 type seriesIndex struct {
-       startTime time.Time
-       store     index.SeriesStore
-       l         *logger.Logger
-       path      string
+       store index.SeriesStore
+       l     *logger.Logger
 }
 
-func newSeriesIndex(ctx context.Context, path string, startTime time.Time, 
flushTimeoutSeconds int64) (*seriesIndex, error) {
+func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds 
int64) (*seriesIndex, error) {
        si := &seriesIndex{
-               path:      path,
-               startTime: startTime,
-               l:         logger.Fetch(ctx, "series_index"),
+               l: logger.Fetch(ctx, "series_index"),
        }
        var err error
        if si.store, err = inverted.NewStore(inverted.StoreOpts{
-               Path:         path,
+               Path:         path.Join(root, "sidx"),
                Logger:       si.l,
                BatchWaitSec: flushTimeoutSeconds,
        }); err != nil {
@@ -301,224 +289,3 @@ func appendSeriesList(dest, src pbv1.SeriesList, target 
common.SeriesID) pbv1.Se
 func (s *seriesIndex) Close() error {
        return s.store.Close()
 }
-
-type seriesIndexController[T TSTable, O any] struct {
-       clock           timestamp.Clock
-       hot             *seriesIndex
-       standby         *seriesIndex
-       l               *logger.Logger
-       location        string
-       opts            TSDBOpts[T, O]
-       standbyLiveTime time.Duration
-       sync.RWMutex
-}
-
-func newSeriesIndexController[T TSTable, O any](
-       ctx context.Context,
-       opts TSDBOpts[T, O],
-) (*seriesIndexController[T, O], error) {
-       l := logger.Fetch(ctx, "seriesIndexController")
-       clock, ctx := timestamp.GetClock(ctx)
-       var standbyLiveTime time.Duration
-       switch opts.TTL.Unit {
-       case HOUR:
-               standbyLiveTime = time.Hour
-       case DAY:
-               standbyLiveTime = 24 * time.Hour
-       default:
-       }
-       sic := &seriesIndexController[T, O]{
-               opts:            opts,
-               clock:           clock,
-               standbyLiveTime: standbyLiveTime,
-               location:        filepath.Clean(opts.Location),
-               l:               l,
-       }
-       idxName, err := sic.loadIdx()
-       if err != nil {
-               return nil, err
-       }
-       switch len(idxName) {
-       case 0:
-               if sic.hot, err = sic.newIdx(ctx, clock.Now()); err != nil {
-                       return nil, err
-               }
-       case 1:
-               if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil {
-                       return nil, err
-               }
-       case 2:
-               if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil {
-                       return nil, err
-               }
-               if sic.standby, err = sic.openIdx(ctx, idxName[1]); err != nil {
-                       return nil, err
-               }
-       default:
-               return nil, errors.New("unexpected series index count")
-       }
-       return sic, nil
-}
-
-func (sic *seriesIndexController[T, O]) getHot() *seriesIndex {
-       sic.RLock()
-       defer sic.RUnlock()
-       return sic.hot
-}
-
-func (sic *seriesIndexController[T, O]) loadIdx() ([]string, error) {
-       idxName := make([]string, 0)
-       if err := walkDir(
-               sic.location,
-               "idx",
-               func(suffix string) error {
-                       idxName = append(idxName, "idx-"+suffix)
-                       return nil
-               }); err != nil {
-               return nil, err
-       }
-       sort.StringSlice(idxName).Sort()
-       if len(idxName) > 2 {
-               redundantIdx := idxName[:len(idxName)-2]
-               for i := range redundantIdx {
-                       lfs.MustRMAll(filepath.Join(sic.location, 
redundantIdx[i]))
-               }
-               idxName = idxName[len(idxName)-2:]
-       }
-       return idxName, nil
-}
-
-func (sic *seriesIndexController[T, O]) newIdx(ctx context.Context, now 
time.Time) (*seriesIndex, error) {
-       ts := sic.opts.TTL.Unit.standard(now)
-       return sic.openIdx(ctx, fmt.Sprintf("idx-%016x", ts.UnixNano()))
-}
-
-func (sic *seriesIndexController[T, O]) newNextIdx(ctx context.Context, now 
time.Time) (*seriesIndex, error) {
-       ts := sic.opts.TTL.Unit.standard(now)
-       ts = ts.Add(sic.standbyLiveTime)
-       return sic.openIdx(ctx, fmt.Sprintf("idx-%016x", ts.UnixNano()))
-}
-
-func (sic *seriesIndexController[T, O]) openIdx(ctx context.Context, name 
string) (*seriesIndex, error) {
-       p := path.Join(sic.location, name)
-       if ts, ok := strings.CutPrefix(name, "idx-"); ok {
-               t, err := strconv.ParseInt(ts, 16, 64)
-               if err != nil {
-                       return nil, err
-               }
-
-               return newSeriesIndex(ctx, p, 
sic.opts.TTL.Unit.standard(time.Unix(0, t)), 
sic.opts.SeriesIndexFlushTimeoutSeconds)
-       }
-       return nil, errors.New("unexpected series index name")
-}
-
-func (sic *seriesIndexController[T, O]) run(now, deadline time.Time) (err 
error) {
-       ctx := context.WithValue(context.Background(), logger.ContextKey, sic.l)
-       if _, err := sic.loadIdx(); err != nil {
-               sic.l.Warn().Err(err).Msg("fail to clear redundant series 
index")
-       }
-
-       sic.Lock()
-       defer sic.Unlock()
-       if sic.hot.startTime.Compare(deadline) <= 0 {
-               return sic.handleStandby(ctx, now, deadline)
-       }
-
-       if sic.standby != nil {
-               return nil
-       }
-       liveTime := sic.hot.startTime.Sub(deadline)
-       if liveTime <= 0 || liveTime > sic.standbyLiveTime {
-               return nil
-       }
-       return sic.createStandby(ctx, now, deadline)
-}
-
-func (sic *seriesIndexController[T, O]) handleStandby(ctx context.Context, 
now, deadline time.Time) error {
-       sic.l.Info().Time("deadline", deadline).Msg("start to swap series 
index")
-
-       if sic.standby == nil {
-               var err error
-               sic.standby, err = sic.newIdx(ctx, now)
-               if err != nil {
-                       return err
-               }
-       }
-
-       standby := sic.hot
-       sic.hot = sic.standby
-       sic.standby = nil
-
-       if err := standby.Close(); err != nil {
-               sic.l.Warn().Err(err).Msg("fail to close standby series index")
-       }
-
-       lfs.MustRMAll(standby.path)
-       sic.l.Info().Str("path", standby.path).Msg("dropped series index")
-       lfs.SyncPath(sic.location)
-
-       return nil
-}
-
-func (sic *seriesIndexController[T, O]) createStandby(ctx context.Context, 
now, deadline time.Time) error {
-       if sic.standby != nil {
-               return nil
-       }
-       sic.l.Info().Time("deadline", deadline).Msg("start to create standby 
series index")
-       standby, err := sic.newNextIdx(ctx, now)
-       if err != nil {
-               sic.l.Error().Err(err).Msg("fail to create standby series 
index")
-               return err
-       }
-
-       sic.standby = standby
-       return nil
-}
-
-func (sic *seriesIndexController[T, O]) Write(docs index.Documents) error {
-       sic.RLock()
-       defer sic.RUnlock()
-       if sic.standby != nil {
-               return sic.standby.Write(docs)
-       }
-       return sic.hot.Write(docs)
-}
-
-func (sic *seriesIndexController[T, O]) searchPrimary(ctx context.Context, 
series []*pbv1.Series) (pbv1.SeriesList, error) {
-       sic.RLock()
-       defer sic.RUnlock()
-
-       sl, err := sic.hot.searchPrimary(ctx, series)
-       if err != nil {
-               return nil, err
-       }
-       if len(sl) > 0 || sic.standby == nil {
-               return sl, nil
-       }
-       return sic.standby.searchPrimary(ctx, series)
-}
-
-func (sic *seriesIndexController[T, O]) Search(ctx context.Context, series 
[]*pbv1.Series,
-       filter index.Filter, order *pbv1.OrderBy, preloadSize int,
-) (pbv1.SeriesList, error) {
-       sic.RLock()
-       defer sic.RUnlock()
-
-       sl, err := sic.hot.Search(ctx, series, filter, order, preloadSize)
-       if err != nil {
-               return nil, err
-       }
-       if len(sl) > 0 || sic.standby == nil {
-               return sl, nil
-       }
-       return sic.standby.Search(ctx, series, filter, order, preloadSize)
-}
-
-func (sic *seriesIndexController[T, O]) Close() error {
-       sic.Lock()
-       defer sic.Unlock()
-       if sic.standby != nil {
-               return multierr.Combine(sic.hot.Close(), sic.standby.Close())
-       }
-       return sic.hot.Close()
-}
diff --git a/banyand/internal/storage/index_test.go 
b/banyand/internal/storage/index_test.go
index 43bd92a9..e28a6f54 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -20,10 +20,7 @@ package storage
 import (
        "context"
        "fmt"
-       "os"
-       "path"
        "testing"
-       "time"
 
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
@@ -41,7 +38,7 @@ var testSeriesPool pbv1.SeriesPool
 func TestSeriesIndex_Primary(t *testing.T) {
        ctx := context.Background()
        path, fn := setUp(require.New(t))
-       si, err := newSeriesIndex(ctx, path, time.Now(), 0)
+       si, err := newSeriesIndex(ctx, path, 0)
        require.NoError(t, err)
        defer func() {
                require.NoError(t, si.Close())
@@ -72,7 +69,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
        require.NoError(t, si.Write(docs))
        // Restart the index
        require.NoError(t, si.Close())
-       si, err = newSeriesIndex(ctx, path, time.Now(), 0)
+       si, err = newSeriesIndex(ctx, path, 0)
        require.NoError(t, err)
        tests := []struct {
                name         string
@@ -185,55 +182,3 @@ func setUp(t *require.Assertions) (tempDir string, 
deferFunc func()) {
        tempDir, deferFunc = test.Space(t)
        return tempDir, deferFunc
 }
-
-func TestSeriesIndexController(t *testing.T) {
-       ttl := IntervalRule{
-               Unit: DAY,
-               Num:  3,
-       }
-       t.Run("Test setup", func(t *testing.T) {
-               ctx := context.Background()
-               tmpDir, dfFn, err := test.NewSpace()
-               require.NoError(t, err)
-               defer dfFn()
-
-               opts := TSDBOpts[TSTable, any]{
-                       Location: tmpDir,
-                       TTL:      ttl,
-               }
-
-               sic, err := newSeriesIndexController(ctx, opts)
-               assert.NoError(t, err)
-               assert.NotNil(t, sic)
-               idxNames := make([]string, 0)
-               walkDir(tmpDir, "idx-", func(suffix string) error {
-                       idxNames = append(idxNames, suffix)
-                       return nil
-               })
-               assert.Equal(t, 1, len(idxNames))
-               require.NoError(t, sic.Close())
-               sic, err = newSeriesIndexController(ctx, opts)
-               assert.NoError(t, err)
-               assert.NotNil(t, sic)
-               idxNames = idxNames[:0]
-               walkDir(tmpDir, "idx-", func(suffix string) error {
-                       idxNames = append(idxNames, suffix)
-                       return nil
-               })
-               assert.Equal(t, 1, len(idxNames))
-               require.NoError(t, sic.Close())
-
-               require.NoError(t, os.MkdirAll(path.Join(tmpDir, 
fmt.Sprintf("idx-%016x", time.Now().UnixNano()-20000)), 0o755))
-               require.NoError(t, os.MkdirAll(path.Join(tmpDir, 
fmt.Sprintf("idx-%016x", time.Now().UnixNano()-10000)), 0o755))
-               sic, err = newSeriesIndexController(ctx, opts)
-               assert.NoError(t, err)
-               assert.NotNil(t, sic)
-               idxNames = idxNames[:0]
-               walkDir(tmpDir, "idx-", func(suffix string) error {
-                       idxNames = append(idxNames, suffix)
-                       return nil
-               })
-               assert.Equal(t, 2, len(idxNames))
-               require.NoError(t, sic.Close())
-       })
-}
diff --git a/banyand/internal/storage/rotation.go 
b/banyand/internal/storage/rotation.go
index 359e937d..9850dc62 100644
--- a/banyand/internal/storage/rotation.go
+++ b/banyand/internal/storage/rotation.go
@@ -51,35 +51,30 @@ func (d *database[T, O]) startRotationTask() error {
                                defer d.rotationProcessOn.Store(false)
                                t := time.Unix(0, ts)
                                rt.run(t, d.logger)
-                               shardsRef := d.sLst.Load()
-                               if shardsRef == nil {
-                                       return
-                               }
-                               for _, s := range *shardsRef {
-                                       func(s *shard[T, O]) {
-                                               ss := 
s.segmentController.segments()
-                                               if len(ss) == 0 {
-                                                       return
+                               func() {
+                                       ss := d.segmentController.segments()
+                                       if len(ss) == 0 {
+                                               return
+                                       }
+                                       defer func() {
+                                               for i := 0; i < len(ss); i++ {
+                                                       ss[i].DecRef()
                                                }
-                                               defer func() {
-                                                       for i := 0; i < 
len(ss); i++ {
-                                                               ss[i].DecRef()
-                                                       }
-                                               }()
-                                               latest := ss[len(ss)-1]
-                                               gap := latest.End.UnixNano() - 
ts
-                                               // gap <=0 means the event is 
from the future
-                                               // the segment will be created 
by a written event directly
-                                               if gap <= 0 || gap > 
newSegmentTimeGap {
-                                                       return
-                                               }
-                                               
d.logger.Info().Time("segment_start", 
s.segmentController.segmentSize.nextTime(t)).Time("event_time", t).Msg("create 
new segment")
-                                               _, err := 
s.segmentController.create(s.segmentController.segmentSize.nextTime(t))
-                                               if err != nil {
-                                                       
d.logger.Error().Err(err).Msgf("failed to create new segment.")
-                                               }
-                                       }(s)
-                               }
+                                       }()
+                                       latest := ss[len(ss)-1]
+                                       gap := latest.End.UnixNano() - ts
+                                       // gap <=0 means the event is from the 
future
+                                       // the segment will be created by a 
written event directly
+                                       if gap <= 0 || gap > newSegmentTimeGap {
+                                               return
+                                       }
+                                       start := 
d.segmentController.opts.SegmentInterval.nextTime(t)
+                                       d.logger.Info().Time("segment_start", 
start).Time("event_time", t).Msg("create new segment")
+                                       _, err := 
d.segmentController.create(start)
+                                       if err != nil {
+                                               
d.logger.Error().Err(err).Msgf("failed to create new segment.")
+                                       }
+                               }()
                        }(ts)
                }
        }(rt)
@@ -115,19 +110,8 @@ func (rc *retentionTask[T, O]) run(now time.Time, l 
*logger.Logger) bool {
                <-rc.running
        }()
 
-       shardList := rc.database.sLst.Load()
-       if shardList == nil {
-               return false
-       }
        deadline := now.Add(-rc.duration)
-
-       for _, shard := range *shardList {
-               if err := shard.segmentController.remove(deadline); err != nil {
-                       l.Error().Err(err)
-               }
-       }
-       stdDeadline := rc.database.opts.TTL.Unit.standard(deadline)
-       if err := rc.database.indexController.run(now, stdDeadline); err != nil 
{
+       if err := rc.database.segmentController.remove(deadline); err != nil {
                l.Error().Err(err)
        }
        return true
diff --git a/banyand/internal/storage/rotation_test.go 
b/banyand/internal/storage/rotation_test.go
index 869e5f63..2c893b59 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -62,7 +62,6 @@ func TestRetention(t *testing.T) {
                tsdb, c, segCtrl, dfFn := setUpDB(t)
                defer dfFn()
                ts := c.Now()
-               indexHotStartTime := tsdb.indexController.hot.startTime
                for i := 0; i < 4; i++ {
                        ts = ts.Add(23 * time.Hour)
 
@@ -84,12 +83,6 @@ func TestRetention(t *testing.T) {
                assert.Eventually(t, func() bool {
                        return len(segCtrl.segments()) == 4
                }, flags.EventuallyTimeout, time.Millisecond, "wait for the 1st 
segment to be deleted")
-               assert.Eventually(t, func() bool {
-                       tsdb.indexController.RLock()
-                       defer tsdb.indexController.RUnlock()
-                       ttl := 
tsdb.indexController.hot.startTime.Sub(indexHotStartTime)
-                       return ttl >= 3*24*time.Hour
-               }, flags.EventuallyTimeout, time.Millisecond, "wait for the 
index to be updated")
        })
 
        t.Run("keep the segment volume stable", func(t *testing.T) {
@@ -131,15 +124,6 @@ func TestRetention(t *testing.T) {
                                        ct.Errorf("expect the segment number 
never to exceed 4, got %d", len(ss))
                                        return
                                }
-                               tsdb.indexController.RLock()
-                               indexStartTime := 
tsdb.indexController.hot.startTime
-                               defer tsdb.indexController.RUnlock()
-                               if ts.Sub(indexStartTime) > 3*24*time.Hour {
-                                       ct.Errorf("expect the index to be 
updated, current time %s, index start time %s",
-                                               ts.Format(time.RFC3339), 
indexStartTime.Format(time.RFC3339))
-                                       return
-                               }
-                               t.Logf("current time: %s, index start time: 
%s", ts.Format(time.RFC3339), indexStartTime.Format(time.RFC3339))
                                if tsdb.rotationProcessOn.Load() {
                                        ct.Errorf("expect the rotation process 
to be off")
                                }
@@ -166,15 +150,13 @@ func setUpDB(t *testing.T) (*database[*MockTSTable, any], 
timestamp.MockClock, *
 
        tsdb, err := OpenTSDB(ctx, TSDBOpts)
        require.NoError(t, err)
-       tsTable, err := tsdb.CreateTSTableIfNotExist(0, ts)
+       seg, err := tsdb.CreateSegmentIfNotExist(ts)
        require.NoError(t, err)
-       tsTable.DecRef()
+       defer seg.DecRef()
 
        db := tsdb.(*database[*MockTSTable, any])
-       shard, ok := db.getShard(0)
-       require.True(t, ok)
-       require.Equal(t, len(shard.segmentController.segments()), 1)
-       return db, mc, shard.segmentController, func() {
+       require.Equal(t, len(db.segmentController.segments()), 1)
+       return db, mc, db.segmentController, func() {
                tsdb.Close()
                defFn()
        }
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 8138244d..9153ce3e 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -40,46 +40,80 @@ import (
 // ErrExpiredData is returned when the data is expired.
 var ErrExpiredData = errors.New("expired data")
 
-type segment[T TSTable] struct {
-       tsTable  T
+type segment[T TSTable, O any] struct {
        l        *logger.Logger
+       index    *seriesIndex
+       sLst     atomic.Pointer[[]*shard[T]]
        position common.Position
        timestamp.TimeRange
-       path          string
+       location      string
        suffix        string
+       opts          TSDBOpts[T, O]
        refCount      int32
        mustBeDeleted uint32
        id            segmentID
 }
 
-func openSegment[T TSTable](ctx context.Context, startTime, endTime time.Time, 
path, suffix string,
-       segmentSize IntervalRule, tsTable T, p common.Position,
-) (s *segment[T], err error) {
+func (sc *segmentController[T, O]) openSegment(ctx context.Context, startTime, 
endTime time.Time, path, suffix string,
+       p common.Position, opts TSDBOpts[T, O],
+) (s *segment[T, O], err error) {
        suffixInteger, err := strconv.Atoi(suffix)
        if err != nil {
                return nil, err
        }
-       id := generateSegID(segmentSize.Unit, suffixInteger)
-       timeRange := timestamp.NewSectionTimeRange(startTime, endTime)
-       s = &segment[T]{
+       id := generateSegID(sc.opts.SegmentInterval.Unit, suffixInteger)
+       sir, err := newSeriesIndex(ctx, path, 
sc.opts.SeriesIndexFlushTimeoutSeconds)
+       if err != nil {
+               return nil, errors.Wrap(errOpenDatabase, 
errors.WithMessage(err, "create series index controller failed").Error())
+       }
+       s = &segment[T, O]{
                id:        id,
-               path:      path,
+               location:  path,
                suffix:    suffix,
-               TimeRange: timeRange,
+               TimeRange: timestamp.NewSectionTimeRange(startTime, endTime),
                position:  p,
-               tsTable:   tsTable,
                refCount:  1,
+               index:     sir,
+               opts:      opts,
        }
-       l := logger.Fetch(ctx, s.String())
-       s.l = l
-       return s, nil
+       s.l = logger.Fetch(ctx, s.String())
+       return s, s.loadShards()
 }
 
-func (s *segment[T]) incRef() {
+func (s *segment[T, O]) loadShards() error {
+       return walkDir(s.location, shardPathPrefix, func(suffix string) error {
+               shardID, err := strconv.Atoi(suffix)
+               if err != nil {
+                       return err
+               }
+               if shardID >= int(s.opts.ShardNum) {
+                       return nil
+               }
+               s.l.Info().Int("shard_id", shardID).Msg("loaded a existed 
shard")
+               _, err = s.CreateTSTableIfNotExist(common.ShardID(shardID))
+               return err
+       })
+}
+
+func (s *segment[T, O]) GetTimeRange() timestamp.TimeRange {
+       return s.TimeRange
+}
+
+func (s *segment[T, O]) Tables() (tt []T) {
+       sLst := s.sLst.Load()
+       if sLst != nil {
+               for _, s := range *sLst {
+                       tt = append(tt, s.table)
+               }
+       }
+       return tt
+}
+
+func (s *segment[T, O]) incRef() {
        atomic.AddInt32(&s.refCount, 1)
 }
 
-func (s *segment[T]) DecRef() {
+func (s *segment[T, O]) DecRef() {
        n := atomic.AddInt32(&s.refCount, -1)
        if n > 0 {
                return
@@ -87,11 +121,18 @@ func (s *segment[T]) DecRef() {
 
        deletePath := ""
        if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
-               deletePath = s.path
+               deletePath = s.location
+       }
+
+       if err := s.index.Close(); err != nil {
+               s.l.Panic().Err(err).Msg("failed to close the series index")
        }
 
-       if err := s.tsTable.Close(); err != nil {
-               s.l.Panic().Err(err).Msg("failed to close tsTable")
+       sLst := s.sLst.Load()
+       if sLst != nil {
+               for _, s := range *sLst {
+                       s.close()
+               }
        }
 
        if deletePath != "" {
@@ -99,52 +140,75 @@ func (s *segment[T]) DecRef() {
        }
 }
 
-func (s *segment[T]) Table() T {
-       return s.tsTable
+func (s *segment[T, O]) delete() {
+       atomic.StoreUint32(&s.mustBeDeleted, 1)
+       s.DecRef()
 }
 
-func (s *segment[T]) GetTimeRange() timestamp.TimeRange {
-       return s.TimeRange
+func (s *segment[T, O]) CreateTSTableIfNotExist(id common.ShardID) (T, error) {
+       if s, ok := s.getShard(id); ok {
+               return s.table, nil
+       }
+       ctx := context.WithValue(context.Background(), logger.ContextKey, s.l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return s.position
+       })
+       so, err := s.openShard(ctx, id)
+       if err != nil {
+               var t T
+               return t, err
+       }
+       var shardList []*shard[T]
+       sLst := s.sLst.Load()
+       if sLst != nil {
+               shardList = *sLst
+       }
+       shardList = append(shardList, so)
+       s.sLst.Store(&shardList)
+       return so.table, nil
 }
 
-func (s *segment[T]) delete() {
-       atomic.StoreUint32(&s.mustBeDeleted, 1)
-       s.DecRef()
+func (s *segment[T, O]) getShard(shardID common.ShardID) (*shard[T], bool) {
+       sLst := s.sLst.Load()
+       if sLst != nil {
+               for _, s := range *sLst {
+                       if s.id == shardID {
+                               return s, true
+                       }
+               }
+       }
+       return nil, false
 }
 
-func (s *segment[T]) String() string {
+func (s *segment[T, O]) String() string {
        return "SegID-" + s.suffix
 }
 
 type segmentController[T TSTable, O any] struct {
-       clock          timestamp.Clock
-       option         O
-       l              *logger.Logger
-       tsTableCreator TSTableCreator[T, O]
-       position       common.Position
-       location       string
-       lst            []*segment[T]
-       segmentSize    IntervalRule
-       deadline       atomic.Int64
+       clock    timestamp.Clock
+       l        *logger.Logger
+       position common.Position
+       location string
+       lst      []*segment[T, O]
+       opts     TSDBOpts[T, O]
+       deadline atomic.Int64
        sync.RWMutex
 }
 
 func newSegmentController[T TSTable, O any](ctx context.Context, location 
string,
-       segmentSize IntervalRule, l *logger.Logger, tsTableCreator 
TSTableCreator[T, O], option O,
+       l *logger.Logger, opts TSDBOpts[T, O],
 ) *segmentController[T, O] {
        clock, _ := timestamp.GetClock(ctx)
        return &segmentController[T, O]{
-               location:       location,
-               segmentSize:    segmentSize,
-               l:              l,
-               clock:          clock,
-               position:       common.GetPosition(ctx),
-               tsTableCreator: tsTableCreator,
-               option:         option,
+               location: location,
+               opts:     opts,
+               l:        l,
+               clock:    clock,
+               position: common.GetPosition(ctx),
        }
 }
 
-func (sc *segmentController[T, O]) selectTSTables(timeRange 
timestamp.TimeRange) (tt []TSTableWrapper[T]) {
+func (sc *segmentController[T, O]) selectSegments(timeRange 
timestamp.TimeRange) (tt []Segment[T, O]) {
        sc.RLock()
        defer sc.RUnlock()
        last := len(sc.lst) - 1
@@ -158,7 +222,7 @@ func (sc *segmentController[T, O]) selectTSTables(timeRange 
timestamp.TimeRange)
        return tt
 }
 
-func (sc *segmentController[T, O]) createTSTable(ts time.Time) 
(TSTableWrapper[T], error) {
+func (sc *segmentController[T, O]) createSegment(ts time.Time) (*segment[T, 
O], error) {
        // Before the first remove old segment run, any segment should be 
created.
        if sc.deadline.Load() > ts.UnixNano() {
                return nil, ErrExpiredData
@@ -171,10 +235,10 @@ func (sc *segmentController[T, O]) createTSTable(ts 
time.Time) (TSTableWrapper[T
        return s, nil
 }
 
-func (sc *segmentController[T, O]) segments() (ss []*segment[T]) {
+func (sc *segmentController[T, O]) segments() (ss []*segment[T, O]) {
        sc.RLock()
        defer sc.RUnlock()
-       r := make([]*segment[T], len(sc.lst))
+       r := make([]*segment[T, O], len(sc.lst))
        for i := range sc.lst {
                sc.lst[i].incRef()
                r[i] = sc.lst[i]
@@ -182,8 +246,8 @@ func (sc *segmentController[T, O]) segments() (ss 
[]*segment[T]) {
        return r
 }
 
-func (sc *segmentController[T, O]) Format(tm time.Time) string {
-       switch sc.segmentSize.Unit {
+func (sc *segmentController[T, O]) format(tm time.Time) string {
+       switch sc.opts.SegmentInterval.Unit {
        case HOUR:
                return tm.Format(hourFormat)
        case DAY:
@@ -192,8 +256,8 @@ func (sc *segmentController[T, O]) Format(tm time.Time) 
string {
        panic("invalid interval unit")
 }
 
-func (sc *segmentController[T, O]) Parse(value string) (time.Time, error) {
-       switch sc.segmentSize.Unit {
+func (sc *segmentController[T, O]) parse(value string) (time.Time, error) {
+       switch sc.opts.SegmentInterval.Unit {
        case HOUR:
                return time.ParseInLocation(hourFormat, value, time.Local)
        case DAY:
@@ -206,8 +270,8 @@ func (sc *segmentController[T, O]) open() error {
        sc.Lock()
        defer sc.Unlock()
        emptySegments := make([]string, 0)
-       err := loadSegments(sc.location, segPathPrefix, sc, sc.segmentSize, 
func(start, end time.Time) error {
-               suffix := sc.Format(start)
+       err := loadSegments(sc.location, segPathPrefix, sc, 
sc.opts.SegmentInterval, func(start, end time.Time) error {
+               suffix := sc.format(start)
                segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
suffix))
                metadataPath := path.Join(segmentPath, metadataFilename)
                version, err := lfs.Read(metadataPath)
@@ -237,7 +301,7 @@ func (sc *segmentController[T, O]) open() error {
        return err
 }
 
-func (sc *segmentController[T, O]) create(start time.Time) (*segment[T], 
error) {
+func (sc *segmentController[T, O]) create(start time.Time) (*segment[T, O], 
error) {
        sc.Lock()
        defer sc.Unlock()
        last := len(sc.lst) - 1
@@ -247,8 +311,8 @@ func (sc *segmentController[T, O]) create(start time.Time) 
(*segment[T], error)
                        return s, nil
                }
        }
-       start = sc.segmentSize.Unit.standard(start)
-       var next *segment[T]
+       start = sc.opts.SegmentInterval.Unit.standard(start)
+       var next *segment[T, O]
        for _, s := range sc.lst {
                if s.Contains(start.UnixNano()) {
                        return s, nil
@@ -257,14 +321,14 @@ func (sc *segmentController[T, O]) create(start 
time.Time) (*segment[T], error)
                        next = s
                }
        }
-       stdEnd := sc.segmentSize.nextTime(start)
+       stdEnd := sc.opts.SegmentInterval.nextTime(start)
        var end time.Time
        if next != nil && next.Start.Before(stdEnd) {
                end = next.Start
        } else {
                end = stdEnd
        }
-       segPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
sc.Format(start)))
+       segPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
sc.format(start)))
        lfs.MkdirPanicIfExist(segPath, dirPerm)
        data := []byte(currentVersion)
        metadataPath := filepath.Join(segPath, metadataFilename)
@@ -288,16 +352,12 @@ func (sc *segmentController[T, O]) sortLst() {
        })
 }
 
-func (sc *segmentController[T, O]) load(start, end time.Time, root string) 
(seg *segment[T], err error) {
-       suffix := sc.Format(start)
+func (sc *segmentController[T, O]) load(start, end time.Time, root string) 
(seg *segment[T, O], err error) {
+       suffix := sc.format(start)
        segPath := path.Join(root, fmt.Sprintf(segTemplate, suffix))
-       var tsTable T
        p := sc.position
        p.Segment = suffix
-       if tsTable, err = sc.tsTableCreator(lfs, segPath, p, sc.l, 
timestamp.NewSectionTimeRange(start, end), sc.option); err != nil {
-               return nil, err
-       }
-       seg, err = openSegment[T](context.WithValue(context.Background(), 
logger.ContextKey, sc.l), start, end, segPath, suffix, sc.segmentSize, tsTable, 
p)
+       seg, err = sc.openSegment(context.WithValue(context.Background(), 
logger.ContextKey, sc.l), start, end, segPath, suffix, p, sc.opts)
        if err != nil {
                return nil, err
        }
@@ -343,17 +403,13 @@ func (sc *segmentController[T, O]) close() {
        sc.lst = sc.lst[:0]
 }
 
-type parser interface {
-       Parse(value string) (time.Time, error)
-}
-
-func loadSegments(root, prefix string, parser parser, intervalRule 
IntervalRule, loadFn func(start, end time.Time) error) error {
+func loadSegments[T TSTable, O any](root, prefix string, parser 
*segmentController[T, O], intervalRule IntervalRule, loadFn func(start, end 
time.Time) error) error {
        var startTimeLst []time.Time
        if err := walkDir(
                root,
                prefix,
                func(suffix string) error {
-                       startTime, err := parser.Parse(suffix)
+                       startTime, err := parser.parse(suffix)
                        if err != nil {
                                return err
                        }
diff --git a/banyand/internal/storage/shard.go 
b/banyand/internal/storage/shard.go
index d4e32023..a3dc8e6a 100644
--- a/banyand/internal/storage/shard.go
+++ b/banyand/internal/storage/shard.go
@@ -22,48 +22,45 @@ import (
        "fmt"
        "path"
        "strconv"
-       "sync"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-type shard[T TSTable, O any] struct {
-       l                 *logger.Logger
-       segmentController *segmentController[T, O]
-       position          common.Position
-       closeOnce         sync.Once
-       id                common.ShardID
+type shard[T TSTable] struct {
+       table     T
+       l         *logger.Logger
+       timeRange timestamp.TimeRange
+       location  string
+       id        common.ShardID
 }
 
-func (d *database[T, O]) openShard(ctx context.Context, id common.ShardID) 
(*shard[T, O], error) {
-       location := path.Join(d.location, fmt.Sprintf(shardTemplate, int(id)))
+func (s *segment[T, O]) openShard(ctx context.Context, id common.ShardID) 
(*shard[T], error) {
+       location := path.Join(s.location, fmt.Sprintf(shardTemplate, int(id)))
        lfs.MkdirIfNotExist(location, dirPerm)
        l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id)))
        l.Info().Int("shard_id", int(id)).Str("path", location).Msg("creating a 
shard")
-       shardCtx := context.WithValue(ctx, logger.ContextKey, l)
-       shardCtx = common.SetPosition(shardCtx, func(p common.Position) 
common.Position {
-               p.Shard = strconv.Itoa(int(id))
-               return p
-       })
-
-       s := &shard[T, O]{
-               id:       id,
-               l:        l,
-               position: common.GetPosition(shardCtx),
-               segmentController: newSegmentController[T](shardCtx, location,
-                       d.opts.SegmentInterval, l,
-                       d.opts.TSTableCreator, d.opts.Option),
-       }
-       var err error
-       if err = s.segmentController.open(); err != nil {
+       p := common.GetPosition(ctx)
+       p.Shard = strconv.Itoa(int(id))
+       t, err := s.opts.TSTableCreator(lfs, location, p, l, s.TimeRange, 
s.opts.Option)
+       if err != nil {
                return nil, err
        }
-       return s, nil
+
+       return &shard[T]{
+               id:        id,
+               l:         l,
+               table:     t,
+               timeRange: s.TimeRange,
+               location:  location,
+       }, nil
+}
+
+func (s *shard[T]) Table() T {
+       return s.table
 }
 
-func (s *shard[T, O]) close() {
-       s.closeOnce.Do(func() {
-               s.segmentController.close()
-       })
+func (s *shard[T]) close() error {
+       return s.table.Close()
 }
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index 935dbe0e..7ab4dc3c 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -72,11 +72,19 @@ type IndexDB interface {
 // TSDB allows listing and getting shard details.
 type TSDB[T TSTable, O any] interface {
        io.Closer
+       CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error)
+       SelectSegments(timeRange timestamp.TimeRange) []Segment[T, O]
+       Tick(ts int64)
+}
+
+// Segment is a time range of data.
+type Segment[T TSTable, O any] interface {
+       DecRef()
+       GetTimeRange() timestamp.TimeRange
+       CreateTSTableIfNotExist(shardID common.ShardID) (T, error)
+       Tables() []T
        Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, 
error)
-       CreateTSTableIfNotExist(shardID common.ShardID, ts time.Time) 
(TSTableWrapper[T], error)
-       SelectTSTables(timeRange timestamp.TimeRange) []TSTableWrapper[T]
        IndexDB() IndexDB
-       Tick(ts int64)
 }
 
 // TSTable is time series table.
@@ -84,14 +92,6 @@ type TSTable interface {
        io.Closer
 }
 
-// TSTableWrapper is a wrapper of TSTable.
-// It is used to manage the reference count of TSTable.
-type TSTableWrapper[T TSTable] interface {
-       DecRef()
-       Table() T
-       GetTimeRange() timestamp.TimeRange
-}
-
 // TSTableCreator creates a TSTable.
 type TSTableCreator[T TSTable, O any] func(fileSystem fs.FileSystem, root 
string, position common.Position,
        l *logger.Logger, timeRange timestamp.TimeRange, option O) (T, error)
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index ba53b107..39ed5ba3 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -20,7 +20,6 @@ package storage
 import (
        "context"
        "path/filepath"
-       "strconv"
        "strings"
        "sync"
        "sync/atomic"
@@ -68,16 +67,15 @@ func generateSegID(unit IntervalUnit, suffix int) segmentID 
{
 }
 
 type database[T TSTable, O any] struct {
-       lock            fs.File
-       logger          *logger.Logger
-       indexController *seriesIndexController[T, O]
-       scheduler       *timestamp.Scheduler
-       sLst            atomic.Pointer[[]*shard[T, O]]
-       tsEventCh       chan int64
-       p               common.Position
-       location        string
-       opts            TSDBOpts[T, O]
-       latestTickTime  atomic.Int64
+       lock              fs.File
+       logger            *logger.Logger
+       scheduler         *timestamp.Scheduler
+       tsEventCh         chan int64
+       segmentController *segmentController[T, O]
+       p                 common.Position
+       location          string
+       opts              TSDBOpts[T, O]
+       latestTickTime    atomic.Int64
        sync.RWMutex
        rotationProcessOn atomic.Bool
 }
@@ -86,18 +84,13 @@ func (d *database[T, O]) Close() error {
        d.Lock()
        defer d.Unlock()
        d.scheduler.Close()
-       sLst := d.sLst.Load()
-       if sLst != nil {
-               for _, s := range *sLst {
-                       s.close()
-               }
-       }
        close(d.tsEventCh)
+       d.segmentController.close()
        d.lock.Close()
        if err := lfs.DeleteFile(d.lock.Path()); err != nil {
                logger.Panicf("cannot delete lock file %s: %s", d.lock.Path(), 
err)
        }
-       return d.indexController.Close()
+       return nil
 }
 
 // OpenTSDB returns a new tsdb runtime. This constructor will create a new 
database if it's absent,
@@ -112,21 +105,18 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts 
TSDBOpts[T, O]) (TSDB[
        p := common.GetPosition(ctx)
        location := filepath.Clean(opts.Location)
        lfs.MkdirIfNotExist(location, dirPerm)
-       sir, err := newSeriesIndexController(ctx, opts)
-       if err != nil {
-               return nil, errors.Wrap(errOpenDatabase, 
errors.WithMessage(err, "create series index controller failed").Error())
-       }
        l := logger.Fetch(ctx, p.Database)
        clock, _ := timestamp.GetClock(ctx)
        scheduler := timestamp.NewScheduler(l, clock)
        db := &database[T, O]{
-               location:        location,
-               scheduler:       scheduler,
-               logger:          l,
-               indexController: sir,
-               opts:            opts,
-               tsEventCh:       make(chan int64),
-               p:               p,
+               location:  location,
+               scheduler: scheduler,
+               logger:    l,
+               opts:      opts,
+               tsEventCh: make(chan int64),
+               p:         p,
+               segmentController: newSegmentController[T](ctx, location,
+                       l, opts),
        }
        db.logger.Info().Str("path", opts.Location).Msg("initialized")
        lockPath := filepath.Join(opts.Location, lockFilename)
@@ -135,101 +125,18 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, 
opts TSDBOpts[T, O]) (TSDB[
                logger.Panicf("cannot create lock file %s: %s", lockPath, err)
        }
        db.lock = lock
-       if err = db.loadDatabase(); err != nil {
-               return nil, errors.Wrap(errOpenDatabase, 
errors.WithMessage(err, "load database failed").Error())
-       }
-       return db, db.startRotationTask()
-}
-
-func (d *database[T, O]) CreateTSTableIfNotExist(shardID common.ShardID, ts 
time.Time) (TSTableWrapper[T], error) {
-       if s, ok := d.getShard(shardID); ok {
-               d.RLock()
-               defer d.RUnlock()
-               return d.createTSTTable(s, ts)
-       }
-       d.Lock()
-       defer d.Unlock()
-       if s, ok := d.getShard(shardID); ok {
-               return d.createTSTTable(s, ts)
-       }
-       d.logger.Info().Int("shard_id", int(shardID)).Msg("creating a shard")
-       s, err := d.registerShard(shardID)
-       if err != nil {
+       if err := db.segmentController.open(); err != nil {
                return nil, err
        }
-       return d.createTSTTable(s, ts)
-}
-
-func (d *database[T, O]) getShard(shardID common.ShardID) (*shard[T, O], bool) 
{
-       sLst := d.sLst.Load()
-       if sLst != nil {
-               for _, s := range *sLst {
-                       if s.id == shardID {
-                               return s, true
-                       }
-               }
-       }
-       return nil, false
+       return db, db.startRotationTask()
 }
 
-func (d *database[T, O]) createTSTTable(shard *shard[T, O], ts time.Time) 
(TSTableWrapper[T], error) {
-       timeRange := timestamp.NewInclusiveTimeRange(ts, ts)
-       ss := shard.segmentController.selectTSTables(timeRange)
-       if len(ss) > 0 {
-               return ss[0], nil
-       }
-       return shard.segmentController.createTSTable(ts)
+func (d *database[T, O]) CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], 
error) {
+       return d.segmentController.createSegment(ts)
 }
 
-func (d *database[T, O]) SelectTSTables(timeRange timestamp.TimeRange) 
[]TSTableWrapper[T] {
-       var result []TSTableWrapper[T]
-       sLst := d.sLst.Load()
-       if sLst == nil {
-               return result
-       }
-       for _, s := range *sLst {
-               result = append(result, 
s.segmentController.selectTSTables(timeRange)...)
-       }
-       return result
-}
-
-func (d *database[T, O]) registerShard(id common.ShardID) (*shard[T, O], 
error) {
-       if s, ok := d.getShard(id); ok {
-               return s, nil
-       }
-       ctx := context.WithValue(context.Background(), logger.ContextKey, 
d.logger)
-       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
-               return d.p
-       })
-       so, err := d.openShard(ctx, id)
-       if err != nil {
-               return nil, err
-       }
-       var shardList []*shard[T, O]
-       sLst := d.sLst.Load()
-       if sLst != nil {
-               shardList = *sLst
-       }
-       shardList = append(shardList, so)
-       d.sLst.Store(&shardList)
-       return so, nil
-}
-
-func (d *database[T, O]) loadDatabase() error {
-       d.Lock()
-       defer d.Unlock()
-       return walkDir(d.location, shardPathPrefix, func(suffix string) error {
-               shardID, err := strconv.Atoi(suffix)
-               if err != nil {
-                       return err
-               }
-               if shardID >= int(d.opts.ShardNum) {
-                       return nil
-               }
-               d.logger.Info().Int("shard_id", shardID).Msg("loaded a existed 
shard")
-               _, err = d.registerShard(common.ShardID(shardID))
-               return err
-       })
+func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange) 
[]Segment[T, O] {
+       return d.segmentController.selectSegments(timeRange)
 }
 
 type walkFn func(suffix string) error
diff --git a/banyand/measure/datapoints.go b/banyand/measure/datapoints.go
index 3996e3a0..03b07093 100644
--- a/banyand/measure/datapoints.go
+++ b/banyand/measure/datapoints.go
@@ -144,15 +144,15 @@ func (d *dataPoints) Swap(i, j int) {
 
 type dataPointsInTable struct {
        timeRange timestamp.TimeRange
-       tsTable   storage.TSTableWrapper[*tsTable]
+       tsTable   *tsTable
 
        dataPoints dataPoints
 }
 
 type dataPointsInGroup struct {
-       tsdb storage.TSDB[*tsTable, option]
-
+       tsdb     storage.TSDB[*tsTable, option]
        docs     index.Documents
        tables   []*dataPointsInTable
+       segments []storage.Segment[*tsTable, option]
        latestTS int64
 }
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 785d772e..f2a8c2ba 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -74,13 +74,7 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (mqr
        if db == nil {
                return mqr, nil
        }
-       tsdb := db.(storage.TSDB[*tsTable, option])
-       tabWrappers := tsdb.SelectTSTables(*mqo.TimeRange)
-       defer func() {
-               for i := range tabWrappers {
-                       tabWrappers[i].DecRef()
-               }
-       }()
+
        series := make([]*pbv1.Series, len(mqo.Entities))
        for i := range mqo.Entities {
                series[i] = &pbv1.Series{
@@ -88,13 +82,24 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (mqr
                        EntityValues: mqo.Entities[i],
                }
        }
+       var result queryResult
+       tsdb := db.(storage.TSDB[*tsTable, option])
+       result.segments = tsdb.SelectSegments(*mqo.TimeRange)
+       if len(result.segments) < 1 {
+               return &result, nil
+       }
+       defer func() {
+               if err != nil {
+                       result.Release()
+               }
+       }()
 
-       sl, err := tsdb.IndexDB().Search(ctx, series, mqo.Filter, mqo.Order, 
preloadSize)
+       sl, tables, err := s.searchSeriesList(ctx, series, mqo, result.segments)
        if err != nil {
                return nil, err
        }
        if len(sl) < 1 {
-               return mqr, nil
+               return &result, nil
        }
        var sids []common.SeriesID
        for i := range sl {
@@ -107,9 +112,8 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (mqr
                maxTimestamp:        mqo.TimeRange.End.UnixNano(),
        }
        var n int
-       var result queryResult
-       for i := range tabWrappers {
-               s := tabWrappers[i].Table().currentSnapshot()
+       for i := range tables {
+               s := tables[i].currentSnapshot()
                if s == nil {
                        continue
                }
@@ -121,54 +125,7 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (mqr
                result.snapshots = append(result.snapshots, s)
        }
 
-       func() {
-               bma := generateBlockMetadataArray()
-               defer releaseBlockMetadataArray(bma)
-               defFn := startBlockScanSpan(ctx, len(sids), parts, &result)
-               defer defFn()
-               // TODO: cache tstIter
-               var tstIter tstIter
-               defer tstIter.reset()
-               originalSids := make([]common.SeriesID, len(sids))
-               copy(originalSids, sids)
-               sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] 
})
-               tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
-               if tstIter.Error() != nil {
-                       err = fmt.Errorf("cannot init tstIter: %w", 
tstIter.Error())
-                       return
-               }
-               projectedEntityOffsets, tagProjectionOnPart := 
s.parseTagProjection(qo, &result)
-               result.tagProjection = qo.TagProjection
-               qo.TagProjection = tagProjectionOnPart
-
-               for tstIter.nextBlock() {
-                       bc := generateBlockCursor()
-                       p := tstIter.piHeap[0]
-
-                       seriesID := p.curBlock.seriesID
-                       if result.entityValues != nil && 
result.entityValues[seriesID] == nil {
-                               for i := range sl {
-                                       if sl[i].ID == seriesID {
-                                               tag := 
make(map[string]*modelv1.TagValue)
-                                               for name, offset := range 
projectedEntityOffsets {
-                                                       tag[name] = 
sl[i].EntityValues[offset]
-                                               }
-                                               result.entityValues[seriesID] = 
tag
-                                       }
-                               }
-                       }
-                       bc.init(p.p, p.curBlock, qo)
-                       result.data = append(result.data, bc)
-               }
-               if tstIter.Error() != nil {
-                       err = fmt.Errorf("cannot iterate tstIter: %w", 
tstIter.Error())
-               }
-               result.sidToIndex = make(map[common.SeriesID]int)
-               for i, si := range originalSids {
-                       result.sidToIndex[si] = i
-               }
-       }()
-       if err != nil {
+       if err = s.searchBlocks(ctx, &result, sl, sids, parts, qo); err != nil {
                return nil, err
        }
 
@@ -188,6 +145,68 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (mqr
        return &result, nil
 }
 
+func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, 
mqo pbv1.MeasureQueryOptions,
+       segments []storage.Segment[*tsTable, option],
+) (sl pbv1.SeriesList, tables []*tsTable, err error) {
+       for i := range segments {
+               tables = append(tables, segments[i].Tables()...)
+               sll, err := segments[i].IndexDB().Search(ctx, series, 
mqo.Filter, mqo.Order, preloadSize)
+               if err != nil {
+                       return nil, nil, err
+               }
+               sl = append(sl, sll...)
+       }
+       return sl, tables, nil
+}
+
+func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sl 
pbv1.SeriesList, sids []common.SeriesID, parts []*part, qo queryOptions) error {
+       bma := generateBlockMetadataArray()
+       defer releaseBlockMetadataArray(bma)
+       defFn := startBlockScanSpan(ctx, len(sids), parts, result)
+       defer defFn()
+       // TODO: cache tstIter
+       var tstIter tstIter
+       defer tstIter.reset()
+       originalSids := make([]common.SeriesID, len(sids))
+       copy(originalSids, sids)
+       sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
+       tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
+       if tstIter.Error() != nil {
+               return fmt.Errorf("cannot init tstIter: %w", tstIter.Error())
+       }
+       projectedEntityOffsets, tagProjectionOnPart := s.parseTagProjection(qo, 
result)
+       result.tagProjection = qo.TagProjection
+       qo.TagProjection = tagProjectionOnPart
+
+       for tstIter.nextBlock() {
+               bc := generateBlockCursor()
+               p := tstIter.piHeap[0]
+
+               seriesID := p.curBlock.seriesID
+               if result.entityValues != nil && result.entityValues[seriesID] 
== nil {
+                       for i := range sl {
+                               if sl[i].ID == seriesID {
+                                       tag := 
make(map[string]*modelv1.TagValue)
+                                       for name, offset := range 
projectedEntityOffsets {
+                                               tag[name] = 
sl[i].EntityValues[offset]
+                                       }
+                                       result.entityValues[seriesID] = tag
+                               }
+                       }
+               }
+               bc.init(p.p, p.curBlock, qo)
+               result.data = append(result.data, bc)
+       }
+       if tstIter.Error() != nil {
+               return fmt.Errorf("cannot iterate tstIter: %w", tstIter.Error())
+       }
+       result.sidToIndex = make(map[common.SeriesID]int)
+       for i, si := range originalSids {
+               result.sidToIndex[si] = i
+       }
+       return nil
+}
+
 func (s *measure) parseTagProjection(qo queryOptions, result *queryResult) 
(projectedEntityOffsets map[string]int, tagProjectionOnPart 
[]pbv1.TagProjection) {
        projectedEntityOffsets = make(map[string]int)
        for i := range qo.TagProjection {
@@ -373,6 +392,7 @@ type queryResult struct {
        tagProjection []pbv1.TagProjection
        data          []*blockCursor
        snapshots     []*snapshot
+       segments      []storage.Segment[*tsTable, option]
        loaded        bool
        orderByTS     bool
        ascTS         bool
@@ -440,6 +460,9 @@ func (qr *queryResult) Release() {
                qr.snapshots[i].decRef()
        }
        qr.snapshots = qr.snapshots[:0]
+       for i := range qr.segments {
+               qr.segments[i].DecRef()
+       }
 }
 
 func (qr queryResult) Len() int {
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index 4c50c47f..5b2e695d 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -28,6 +28,7 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
@@ -64,8 +65,9 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
        dpg, ok := dst[gn]
        if !ok {
                dpg = &dataPointsInGroup{
-                       tsdb:   tsdb,
-                       tables: make([]*dataPointsInTable, 0),
+                       tsdb:     tsdb,
+                       tables:   make([]*dataPointsInTable, 0),
+                       segments: make([]storage.Segment[*tsTable, option], 0),
                }
                dst[gn] = dpg
        }
@@ -82,15 +84,9 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
        }
        shardID := common.ShardID(writeEvent.ShardId)
        if dpt == nil {
-               tstb, err := tsdb.CreateTSTableIfNotExist(shardID, t)
-               if err != nil {
-                       return nil, fmt.Errorf("cannot create ts table: %w", 
err)
-               }
-               dpt = &dataPointsInTable{
-                       timeRange: tstb.GetTimeRange(),
-                       tsTable:   tstb,
+               if dpt, err = w.newDpt(tsdb, dpg, t, ts, shardID); err != nil {
+                       return nil, fmt.Errorf("cannot create data points in 
table: %w", err)
                }
-               dpg.tables = append(dpg.tables, dpt)
        }
        dpt.dataPoints.timestamps = append(dpt.dataPoints.timestamps, ts)
        dpt.dataPoints.versions = append(dpt.dataPoints.versions, 
req.DataPoint.Version)
@@ -211,6 +207,33 @@ func (w *writeCallback) handle(dst 
map[string]*dataPointsInGroup, writeEvent *me
        return dst, nil
 }
 
+func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg 
*dataPointsInGroup, t time.Time, ts int64, shardID common.ShardID) 
(*dataPointsInTable, error) {
+       var segment storage.Segment[*tsTable, option]
+       for _, seg := range dpg.segments {
+               if seg.GetTimeRange().Contains(ts) {
+                       segment = seg
+               }
+       }
+       if segment == nil {
+               var err error
+               segment, err = tsdb.CreateSegmentIfNotExist(t)
+               if err != nil {
+                       return nil, fmt.Errorf("cannot create segment: %w", err)
+               }
+               dpg.segments = append(dpg.segments, segment)
+       }
+       tstb, err := segment.CreateTSTableIfNotExist(shardID)
+       if err != nil {
+               return nil, fmt.Errorf("cannot create ts table: %w", err)
+       }
+       dpt := &dataPointsInTable{
+               timeRange: segment.GetTimeRange(),
+               tsTable:   tstb,
+       }
+       dpg.tables = append(dpg.tables, dpt)
+       return dpt, nil
+}
+
 func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
        events, ok := message.Data().([]any)
        if !ok {
@@ -246,15 +269,17 @@ func (w *writeCallback) Rev(message bus.Message) (resp 
bus.Message) {
        }
        for i := range groups {
                g := groups[i]
-               g.tsdb.Tick(g.latestTS)
                for j := range g.tables {
                        dps := g.tables[j]
-                       dps.tsTable.Table().mustAddDataPoints(&dps.dataPoints)
-                       dps.tsTable.DecRef()
+                       dps.tsTable.mustAddDataPoints(&dps.dataPoints)
                }
-               if err := g.tsdb.IndexDB().Write(g.docs); err != nil {
-                       w.l.Error().Err(err).Msg("cannot write index")
+               for _, segment := range g.segments {
+                       if err := segment.IndexDB().Write(g.docs); err != nil {
+                               w.l.Error().Err(err).Msg("cannot write index")
+                       }
+                       segment.DecRef()
                }
+               g.tsdb.Tick(g.latestTS)
        }
        return
 }
diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go
index f2fb7c34..633593da 100644
--- a/banyand/stream/benchmark_test.go
+++ b/banyand/stream/benchmark_test.go
@@ -23,7 +23,6 @@ import (
        "io"
        "math"
        "math/big"
-       "path/filepath"
        "strconv"
        "sync/atomic"
        "testing"
@@ -37,11 +36,9 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/convert"
-       "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        logicalstream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
        "github.com/apache/skywalking-banyandb/pkg/test"
@@ -195,41 +192,8 @@ func openDatabase(b *testing.B, path string) 
storage.TSDB[*tsTable, option] {
 func write(b *testing.B, p parameter, esList []*elements, docsList 
[]index.Documents) storage.TSDB[*tsTable, option] {
        // Initialize a tstIter object.
        tmpPath, defFn := test.Space(require.New(b))
-       segmentPath := filepath.Join(tmpPath, "shard-0", "seg-19700101")
-       fileSystem := fs.NewLocalFileSystem()
        defer defFn()
-       tst, err := newTSTable(fileSystem, segmentPath, common.Position{},
-               // Since Stream deduplicate data in merging process, we need to 
disable the merging in the test.
-               logger.GetLogger("benchmark"), timestamp.TimeRange{}, 
option{flushTimeout: 0, mergePolicy: newDisabledMergePolicyForTesting()})
-       require.NoError(b, err)
-       for i := 0; i < len(esList); i++ {
-               tst.mustAddElements(esList[i])
-               tst.index.Write(docsList[i])
-               time.Sleep(100 * time.Millisecond)
-       }
-       // wait until the introducer is done
-       if len(esList) > 0 {
-               for {
-                       snp := tst.currentSnapshot()
-                       if snp == nil {
-                               time.Sleep(100 * time.Millisecond)
-                               continue
-                       }
-                       if snp.creator != snapshotCreatorMemPart && 
len(snp.parts) == len(esList) {
-                               snp.decRef()
-                               tst.Close()
-                               break
-                       }
-                       snp.decRef()
-                       time.Sleep(100 * time.Millisecond)
-               }
-       }
-       data := []byte(version)
-       metadataPath := filepath.Join(segmentPath, segmentMetadataFilename)
-       lf, err := fileSystem.CreateLockFile(metadataPath, filePermission)
-       require.NoError(b, err)
-       _, err = lf.Write(data)
-       require.NoError(b, err)
+
        db := openDatabase(b, tmpPath)
        var docs index.Documents
        for i := 1; i <= p.seriesCount; i++ {
@@ -246,13 +210,22 @@ func write(b *testing.B, p parameter, esList []*elements, 
docsList []index.Docum
                        Subject:      "benchmark",
                        EntityValues: entity,
                }
-               err = series.Marshal()
+               err := series.Marshal()
                require.NoError(b, err)
                docs = append(docs, index.Document{
                        DocID:        uint64(i),
                        EntityValues: series.Buffer,
                })
-               db.IndexDB().Write(docs)
+       }
+       seg, err := db.CreateSegmentIfNotExist(time.Unix(0, 
esList[0].timestamps[0]))
+       require.NoError(b, err)
+       seg.IndexDB().Write(docs)
+
+       tst, err := seg.CreateTSTableIfNotExist(common.ShardID(0))
+       require.NoError(b, err)
+       for i := range esList {
+               tst.mustAddElements(esList[i])
+               tst.Index().Write(docsList[i])
        }
        return db
 }
diff --git a/banyand/stream/elements.go b/banyand/stream/elements.go
index 075a3b67..a45a1240 100644
--- a/banyand/stream/elements.go
+++ b/banyand/stream/elements.go
@@ -142,15 +142,17 @@ func (e *elements) Swap(i, j int) {
 
 type elementsInTable struct {
        timeRange timestamp.TimeRange
-       tsTable   storage.TSTableWrapper[*tsTable]
+       tsTable   *tsTable
 
        elements elements
-       docs     index.Documents
+
+       docs index.Documents
 }
 
 type elementsInGroup struct {
        tsdb     storage.TSDB[*tsTable, option]
        docs     index.Documents
        tables   []*elementsInTable
+       segments []storage.Segment[*tsTable, option]
        latestTS int64
 }
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index ef7925b9..e50d7b84 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -53,12 +53,16 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (sqr pb
        }
        var result queryResult
        tsdb := db.(storage.TSDB[*tsTable, option])
-       result.tabWrappers = tsdb.SelectTSTables(*sqo.TimeRange)
+       result.segments = tsdb.SelectSegments(*sqo.TimeRange)
+       if len(result.segments) < 1 {
+               return &result, nil
+       }
        defer func() {
-               if sqr == nil {
+               if err != nil {
                        result.Release()
                }
        }()
+       var tables []*tsTable
 
        series := make([]*pbv1.Series, len(sqo.Entities))
        for i := range sqo.Entities {
@@ -67,12 +71,19 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (sqr pb
                        EntityValues: sqo.Entities[i],
                }
        }
-       seriesList, err := tsdb.Lookup(ctx, series)
-       if err != nil {
-               return nil, err
+       var seriesList, sl pbv1.SeriesList
+       for i := range result.segments {
+               tables = append(tables, result.segments[i].Tables()...)
+               sl, err = result.segments[i].Lookup(ctx, series)
+               if err != nil {
+                       return nil, err
+               }
+               seriesList = append(seriesList, sl...)
+               result.tabs = append(result.tabs, tables...)
        }
+
        if len(seriesList) == 0 {
-               return sqr, nil
+               return &result, nil
        }
        result.qo = queryOptions{
                StreamQueryOptions: sqo,
@@ -84,7 +95,7 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (sqr pb
                result.qo.seriesToEntity[seriesList[i].ID] = 
seriesList[i].EntityValues
                result.qo.sortedSids = append(result.qo.sortedSids, 
seriesList[i].ID)
        }
-       if result.qo.elementFilter, err = indexSearch(sqo, result.tabWrappers, 
seriesList); err != nil {
+       if result.qo.elementFilter, err = indexSearch(sqo, result.tabs, 
seriesList); err != nil {
                return nil, err
        }
        result.tagNameIndex = make(map[string]partition.TagLocator)
@@ -105,7 +116,7 @@ func (s *stream) Query(ctx context.Context, sqo 
pbv1.StreamQueryOptions) (sqr pb
 
        if sqo.Order.Index == nil {
                result.orderByTS = true
-       } else if result.sortingIter, err = s.indexSort(sqo, 
result.tabWrappers, seriesList); err != nil {
+       } else if result.sortingIter, err = s.indexSort(sqo, result.tabs, 
seriesList); err != nil {
                return nil, err
        }
        if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == 
modelv1.Sort_SORT_UNSPECIFIED {
@@ -127,10 +138,11 @@ type queryResult struct {
        sortingIter      itersort.Iterator[*index.ItemRef]
        tagNameIndex     map[string]partition.TagLocator
        schema           *databasev1.Stream
-       tabWrappers      []storage.TSTableWrapper[*tsTable]
+       tabs             []*tsTable
        elementIDsSorted []uint64
        data             []*blockCursor
        snapshots        []*snapshot
+       segments         []storage.Segment[*tsTable, option]
        qo               queryOptions
        loaded           bool
        orderByTS        bool
@@ -157,8 +169,8 @@ func (qr *queryResult) Pull(ctx context.Context) 
*pbv1.StreamResult {
 func (qr *queryResult) scanParts(ctx context.Context, qo queryOptions) error {
        var parts []*part
        var n int
-       for i := range qr.tabWrappers {
-               s := qr.tabWrappers[i].Table().currentSnapshot()
+       for i := range qr.tabs {
+               s := qr.tabs[i].currentSnapshot()
                if s == nil {
                        continue
                }
@@ -380,8 +392,8 @@ func (qr *queryResult) releaseBlockCursor() {
 
 func (qr *queryResult) Release() {
        qr.releaseParts()
-       for i := range qr.tabWrappers {
-               qr.tabWrappers[i].DecRef()
+       for i := range qr.segments {
+               qr.segments[i].DecRef()
        }
 }
 
@@ -504,14 +516,14 @@ func (qr *queryResult) mergeByTimestamp() 
*pbv1.StreamResult {
 }
 
 func indexSearch(sqo pbv1.StreamQueryOptions,
-       tabWrappers []storage.TSTableWrapper[*tsTable], seriesList 
pbv1.SeriesList,
+       tabs []*tsTable, seriesList pbv1.SeriesList,
 ) (posting.List, error) {
        if sqo.Filter == nil || sqo.Filter == logical.ENode {
                return nil, nil
        }
        result := roaring.NewPostingList()
-       for _, tw := range tabWrappers {
-               index := tw.Table().Index()
+       for _, tw := range tabs {
+               index := tw.Index()
                pl, err := index.Search(seriesList, sqo.Filter)
                if err != nil {
                        return nil, err
@@ -526,13 +538,13 @@ func indexSearch(sqo pbv1.StreamQueryOptions,
        return result, nil
 }
 
-func (s *stream) indexSort(sqo pbv1.StreamQueryOptions, tabWrappers 
[]storage.TSTableWrapper[*tsTable],
+func (s *stream) indexSort(sqo pbv1.StreamQueryOptions, tabs []*tsTable,
        seriesList pbv1.SeriesList,
 ) (itersort.Iterator[*index.ItemRef], error) {
        if sqo.Order == nil || sqo.Order.Index == nil {
                return nil, nil
        }
-       iters, err := s.buildItersByIndex(tabWrappers, seriesList, sqo)
+       iters, err := s.buildItersByIndex(tabs, seriesList, sqo)
        if err != nil {
                return nil, err
        }
@@ -540,7 +552,7 @@ func (s *stream) indexSort(sqo pbv1.StreamQueryOptions, 
tabWrappers []storage.TS
        return itersort.NewItemIter[*index.ItemRef](iters, desc), nil
 }
 
-func (s *stream) buildItersByIndex(tableWrappers 
[]storage.TSTableWrapper[*tsTable],
+func (s *stream) buildItersByIndex(tables []*tsTable,
        seriesList pbv1.SeriesList, sqo pbv1.StreamQueryOptions,
 ) (iters []itersort.Iterator[*index.ItemRef], err error) {
        indexRuleForSorting := sqo.Order.Index
@@ -548,13 +560,13 @@ func (s *stream) buildItersByIndex(tableWrappers 
[]storage.TSTableWrapper[*tsTab
                return nil, fmt.Errorf("only support one tag for sorting, but 
got %d", len(indexRuleForSorting.Tags))
        }
        sids := seriesList.IDs()
-       for _, tw := range tableWrappers {
+       for _, tw := range tables {
                var iter index.FieldIterator[*index.ItemRef]
                fieldKey := index.FieldKey{
                        IndexRuleID: indexRuleForSorting.GetMetadata().GetId(),
                        Analyzer:    indexRuleForSorting.GetAnalyzer(),
                }
-               iter, err = tw.Table().Index().Sort(sids, fieldKey, 
sqo.Order.Sort, sqo.TimeRange, sqo.MaxElementSize)
+               iter, err = tw.Index().Sort(sids, fieldKey, sqo.Order.Sort, 
sqo.TimeRange, sqo.MaxElementSize)
                if err != nil {
                        return nil, err
                }
diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go
index 8cc5236d..5489884d 100644
--- a/banyand/stream/query_test.go
+++ b/banyand/stream/query_test.go
@@ -30,7 +30,6 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
@@ -308,7 +307,7 @@ func TestQueryResult(t *testing.T) {
                                                TagProjection: tagProjectionAll,
                                        },
                                }
-                               result.tabWrappers = 
[]storage.TSTableWrapper[*tsTable]{&tsTableWrapper{tst}}
+                               result.tabs = []*tsTable{tst}
                                defer result.Release()
                                if !tt.orderBySeries {
                                        result.orderByTS = true
@@ -401,22 +400,6 @@ func TestQueryResult(t *testing.T) {
        }
 }
 
-type tsTableWrapper struct {
-       *tsTable
-}
-
-func (t *tsTableWrapper) Table() *tsTable {
-       return t.tsTable
-}
-
-func (t *tsTableWrapper) GetTimeRange() timestamp.TimeRange {
-       return timestamp.TimeRange{}
-}
-
-func (t *tsTableWrapper) DecRef() {
-       t.tsTable.Close()
-}
-
 func emptyTagFamilies(size int) []pbv1.TagFamily {
        var values []*modelv1.TagValue
        for i := 0; i < size; i++ {
diff --git a/banyand/stream/write.go b/banyand/stream/write.go
index bcb1c3f9..2ba742ba 100644
--- a/banyand/stream/write.go
+++ b/banyand/stream/write.go
@@ -28,6 +28,7 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
@@ -66,8 +67,9 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
        eg, ok := dst[gn]
        if !ok {
                eg = &elementsInGroup{
-                       tsdb:   tsdb,
-                       tables: make([]*elementsInTable, 0),
+                       tsdb:     tsdb,
+                       tables:   make([]*elementsInTable, 0),
+                       segments: make([]storage.Segment[*tsTable, option], 0),
                }
                dst[gn] = eg
        }
@@ -84,13 +86,26 @@ func (w *writeCallback) handle(dst 
map[string]*elementsInGroup, writeEvent *stre
        }
        shardID := common.ShardID(writeEvent.ShardId)
        if et == nil {
-               tsdb, err := tsdb.CreateTSTableIfNotExist(shardID, t)
+               var segment storage.Segment[*tsTable, option]
+               for _, seg := range eg.segments {
+                       if seg.GetTimeRange().Contains(ts) {
+                               segment = seg
+                       }
+               }
+               if segment == nil {
+                       segment, err = tsdb.CreateSegmentIfNotExist(t)
+                       if err != nil {
+                               return nil, fmt.Errorf("cannot create segment: 
%w", err)
+                       }
+                       eg.segments = append(eg.segments, segment)
+               }
+               tstb, err := segment.CreateTSTableIfNotExist(shardID)
                if err != nil {
                        return nil, fmt.Errorf("cannot create ts table: %w", 
err)
                }
                et = &elementsInTable{
-                       timeRange: tsdb.GetTimeRange(),
-                       tsTable:   tsdb,
+                       timeRange: segment.GetTimeRange(),
+                       tsTable:   tstb,
                }
                eg.tables = append(eg.tables, et)
        }
@@ -239,23 +254,25 @@ func (w *writeCallback) Rev(message bus.Message) (resp 
bus.Message) {
        }
        for i := range groups {
                g := groups[i]
-               g.tsdb.Tick(g.latestTS)
                for j := range g.tables {
                        es := g.tables[j]
-                       es.tsTable.Table().mustAddElements(&es.elements)
+                       es.tsTable.mustAddElements(&es.elements)
                        if len(es.docs) > 0 {
-                               index := es.tsTable.Table().Index()
+                               index := es.tsTable.Index()
                                if err := index.Write(es.docs); err != nil {
                                        w.l.Error().Err(err).Msg("cannot write 
element index")
                                }
                        }
-                       es.tsTable.DecRef()
                }
                if len(g.docs) > 0 {
-                       if err := g.tsdb.IndexDB().Write(g.docs); err != nil {
-                               w.l.Error().Err(err).Msg("cannot write series 
index")
+                       for _, segment := range g.segments {
+                               if err := segment.IndexDB().Write(g.docs); err 
!= nil {
+                                       w.l.Error().Err(err).Msg("cannot write 
index")
+                               }
+                               segment.DecRef()
                        }
                }
+               g.tsdb.Tick(g.latestTS)
        }
        return
 }

Reply via email to