This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new d5f5dbfc Change the storage layer (#492) d5f5dbfc is described below commit d5f5dbfc0b6abf93f468321c57c193a2d74369e9 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Mon Jul 22 21:04:52 2024 +0800 Change the storage layer (#492) Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- CHANGES.md | 2 + banyand/internal/storage/README.md | 3 +- banyand/internal/storage/index.go | 251 ++---------------------------- banyand/internal/storage/index_test.go | 59 +------ banyand/internal/storage/rotation.go | 64 +++----- banyand/internal/storage/rotation_test.go | 26 +--- banyand/internal/storage/segment.go | 202 +++++++++++++++--------- banyand/internal/storage/shard.go | 57 ++++--- banyand/internal/storage/storage.go | 22 +-- banyand/internal/storage/tsdb.go | 143 +++-------------- banyand/measure/datapoints.go | 6 +- banyand/measure/query.go | 143 ++++++++++------- banyand/measure/write.go | 55 +++++-- banyand/stream/benchmark_test.go | 51 ++---- banyand/stream/elements.go | 6 +- banyand/stream/query.go | 54 ++++--- banyand/stream/query_test.go | 19 +-- banyand/stream/write.go | 39 +++-- 18 files changed, 439 insertions(+), 763 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index d3362043..58b59f87 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,8 @@ Release Notes. ### File System Changes - Bump up the version of the file system to 1.1.0 which is not compatible with the previous version. +- Move the series index into segment. +- Swap the segment and the shard. ### Features diff --git a/banyand/internal/storage/README.md b/banyand/internal/storage/README.md index 77f177c6..f55c26b7 100644 --- a/banyand/internal/storage/README.md +++ b/banyand/internal/storage/README.md @@ -1,2 +1,3 @@ # File Version Compatibility Information -The current file version is 1.0.0, compatible with versions specified in version.yml. + +The current file version is 1.1.0, compatible with versions specified in version.yml. diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index d5c935fd..f3fa2f04 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -19,14 +19,7 @@ package storage import ( "context" - "fmt" "path" - "path/filepath" - "sort" - "strconv" - "strings" - "sync" - "time" "github.com/pkg/errors" "go.uber.org/multierr" @@ -40,33 +33,28 @@ import ( pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/query/logical" - "github.com/apache/skywalking-banyandb/pkg/timestamp" ) -func (d *database[T, O]) IndexDB() IndexDB { - return d.indexController.getHot() +func (s *segment[T, O]) IndexDB() IndexDB { + return s.index } -func (d *database[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) { - return d.indexController.searchPrimary(ctx, series) +func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) { + return s.index.searchPrimary(ctx, series) } type seriesIndex struct { - startTime time.Time - store index.SeriesStore - l *logger.Logger - path string + store index.SeriesStore + l *logger.Logger } -func newSeriesIndex(ctx context.Context, path string, startTime time.Time, flushTimeoutSeconds int64) (*seriesIndex, error) { +func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds int64) (*seriesIndex, error) { si := &seriesIndex{ - path: path, - startTime: startTime, - l: logger.Fetch(ctx, "series_index"), + l: logger.Fetch(ctx, "series_index"), } var err error if si.store, err = inverted.NewStore(inverted.StoreOpts{ - Path: path, + Path: path.Join(root, "sidx"), Logger: si.l, BatchWaitSec: flushTimeoutSeconds, }); err != nil { @@ -301,224 +289,3 @@ func appendSeriesList(dest, src pbv1.SeriesList, target common.SeriesID) pbv1.Se func (s *seriesIndex) Close() error { return s.store.Close() } - -type seriesIndexController[T TSTable, O any] struct { - clock timestamp.Clock - hot *seriesIndex - standby *seriesIndex - l *logger.Logger - location string - opts TSDBOpts[T, O] - standbyLiveTime time.Duration - sync.RWMutex -} - -func newSeriesIndexController[T TSTable, O any]( - ctx context.Context, - opts TSDBOpts[T, O], -) (*seriesIndexController[T, O], error) { - l := logger.Fetch(ctx, "seriesIndexController") - clock, ctx := timestamp.GetClock(ctx) - var standbyLiveTime time.Duration - switch opts.TTL.Unit { - case HOUR: - standbyLiveTime = time.Hour - case DAY: - standbyLiveTime = 24 * time.Hour - default: - } - sic := &seriesIndexController[T, O]{ - opts: opts, - clock: clock, - standbyLiveTime: standbyLiveTime, - location: filepath.Clean(opts.Location), - l: l, - } - idxName, err := sic.loadIdx() - if err != nil { - return nil, err - } - switch len(idxName) { - case 0: - if sic.hot, err = sic.newIdx(ctx, clock.Now()); err != nil { - return nil, err - } - case 1: - if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil { - return nil, err - } - case 2: - if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil { - return nil, err - } - if sic.standby, err = sic.openIdx(ctx, idxName[1]); err != nil { - return nil, err - } - default: - return nil, errors.New("unexpected series index count") - } - return sic, nil -} - -func (sic *seriesIndexController[T, O]) getHot() *seriesIndex { - sic.RLock() - defer sic.RUnlock() - return sic.hot -} - -func (sic *seriesIndexController[T, O]) loadIdx() ([]string, error) { - idxName := make([]string, 0) - if err := walkDir( - sic.location, - "idx", - func(suffix string) error { - idxName = append(idxName, "idx-"+suffix) - return nil - }); err != nil { - return nil, err - } - sort.StringSlice(idxName).Sort() - if len(idxName) > 2 { - redundantIdx := idxName[:len(idxName)-2] - for i := range redundantIdx { - lfs.MustRMAll(filepath.Join(sic.location, redundantIdx[i])) - } - idxName = idxName[len(idxName)-2:] - } - return idxName, nil -} - -func (sic *seriesIndexController[T, O]) newIdx(ctx context.Context, now time.Time) (*seriesIndex, error) { - ts := sic.opts.TTL.Unit.standard(now) - return sic.openIdx(ctx, fmt.Sprintf("idx-%016x", ts.UnixNano())) -} - -func (sic *seriesIndexController[T, O]) newNextIdx(ctx context.Context, now time.Time) (*seriesIndex, error) { - ts := sic.opts.TTL.Unit.standard(now) - ts = ts.Add(sic.standbyLiveTime) - return sic.openIdx(ctx, fmt.Sprintf("idx-%016x", ts.UnixNano())) -} - -func (sic *seriesIndexController[T, O]) openIdx(ctx context.Context, name string) (*seriesIndex, error) { - p := path.Join(sic.location, name) - if ts, ok := strings.CutPrefix(name, "idx-"); ok { - t, err := strconv.ParseInt(ts, 16, 64) - if err != nil { - return nil, err - } - - return newSeriesIndex(ctx, p, sic.opts.TTL.Unit.standard(time.Unix(0, t)), sic.opts.SeriesIndexFlushTimeoutSeconds) - } - return nil, errors.New("unexpected series index name") -} - -func (sic *seriesIndexController[T, O]) run(now, deadline time.Time) (err error) { - ctx := context.WithValue(context.Background(), logger.ContextKey, sic.l) - if _, err := sic.loadIdx(); err != nil { - sic.l.Warn().Err(err).Msg("fail to clear redundant series index") - } - - sic.Lock() - defer sic.Unlock() - if sic.hot.startTime.Compare(deadline) <= 0 { - return sic.handleStandby(ctx, now, deadline) - } - - if sic.standby != nil { - return nil - } - liveTime := sic.hot.startTime.Sub(deadline) - if liveTime <= 0 || liveTime > sic.standbyLiveTime { - return nil - } - return sic.createStandby(ctx, now, deadline) -} - -func (sic *seriesIndexController[T, O]) handleStandby(ctx context.Context, now, deadline time.Time) error { - sic.l.Info().Time("deadline", deadline).Msg("start to swap series index") - - if sic.standby == nil { - var err error - sic.standby, err = sic.newIdx(ctx, now) - if err != nil { - return err - } - } - - standby := sic.hot - sic.hot = sic.standby - sic.standby = nil - - if err := standby.Close(); err != nil { - sic.l.Warn().Err(err).Msg("fail to close standby series index") - } - - lfs.MustRMAll(standby.path) - sic.l.Info().Str("path", standby.path).Msg("dropped series index") - lfs.SyncPath(sic.location) - - return nil -} - -func (sic *seriesIndexController[T, O]) createStandby(ctx context.Context, now, deadline time.Time) error { - if sic.standby != nil { - return nil - } - sic.l.Info().Time("deadline", deadline).Msg("start to create standby series index") - standby, err := sic.newNextIdx(ctx, now) - if err != nil { - sic.l.Error().Err(err).Msg("fail to create standby series index") - return err - } - - sic.standby = standby - return nil -} - -func (sic *seriesIndexController[T, O]) Write(docs index.Documents) error { - sic.RLock() - defer sic.RUnlock() - if sic.standby != nil { - return sic.standby.Write(docs) - } - return sic.hot.Write(docs) -} - -func (sic *seriesIndexController[T, O]) searchPrimary(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) { - sic.RLock() - defer sic.RUnlock() - - sl, err := sic.hot.searchPrimary(ctx, series) - if err != nil { - return nil, err - } - if len(sl) > 0 || sic.standby == nil { - return sl, nil - } - return sic.standby.searchPrimary(ctx, series) -} - -func (sic *seriesIndexController[T, O]) Search(ctx context.Context, series []*pbv1.Series, - filter index.Filter, order *pbv1.OrderBy, preloadSize int, -) (pbv1.SeriesList, error) { - sic.RLock() - defer sic.RUnlock() - - sl, err := sic.hot.Search(ctx, series, filter, order, preloadSize) - if err != nil { - return nil, err - } - if len(sl) > 0 || sic.standby == nil { - return sl, nil - } - return sic.standby.Search(ctx, series, filter, order, preloadSize) -} - -func (sic *seriesIndexController[T, O]) Close() error { - sic.Lock() - defer sic.Unlock() - if sic.standby != nil { - return multierr.Combine(sic.hot.Close(), sic.standby.Close()) - } - return sic.hot.Close() -} diff --git a/banyand/internal/storage/index_test.go b/banyand/internal/storage/index_test.go index 43bd92a9..e28a6f54 100644 --- a/banyand/internal/storage/index_test.go +++ b/banyand/internal/storage/index_test.go @@ -20,10 +20,7 @@ package storage import ( "context" "fmt" - "os" - "path" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -41,7 +38,7 @@ var testSeriesPool pbv1.SeriesPool func TestSeriesIndex_Primary(t *testing.T) { ctx := context.Background() path, fn := setUp(require.New(t)) - si, err := newSeriesIndex(ctx, path, time.Now(), 0) + si, err := newSeriesIndex(ctx, path, 0) require.NoError(t, err) defer func() { require.NoError(t, si.Close()) @@ -72,7 +69,7 @@ func TestSeriesIndex_Primary(t *testing.T) { require.NoError(t, si.Write(docs)) // Restart the index require.NoError(t, si.Close()) - si, err = newSeriesIndex(ctx, path, time.Now(), 0) + si, err = newSeriesIndex(ctx, path, 0) require.NoError(t, err) tests := []struct { name string @@ -185,55 +182,3 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func()) { tempDir, deferFunc = test.Space(t) return tempDir, deferFunc } - -func TestSeriesIndexController(t *testing.T) { - ttl := IntervalRule{ - Unit: DAY, - Num: 3, - } - t.Run("Test setup", func(t *testing.T) { - ctx := context.Background() - tmpDir, dfFn, err := test.NewSpace() - require.NoError(t, err) - defer dfFn() - - opts := TSDBOpts[TSTable, any]{ - Location: tmpDir, - TTL: ttl, - } - - sic, err := newSeriesIndexController(ctx, opts) - assert.NoError(t, err) - assert.NotNil(t, sic) - idxNames := make([]string, 0) - walkDir(tmpDir, "idx-", func(suffix string) error { - idxNames = append(idxNames, suffix) - return nil - }) - assert.Equal(t, 1, len(idxNames)) - require.NoError(t, sic.Close()) - sic, err = newSeriesIndexController(ctx, opts) - assert.NoError(t, err) - assert.NotNil(t, sic) - idxNames = idxNames[:0] - walkDir(tmpDir, "idx-", func(suffix string) error { - idxNames = append(idxNames, suffix) - return nil - }) - assert.Equal(t, 1, len(idxNames)) - require.NoError(t, sic.Close()) - - require.NoError(t, os.MkdirAll(path.Join(tmpDir, fmt.Sprintf("idx-%016x", time.Now().UnixNano()-20000)), 0o755)) - require.NoError(t, os.MkdirAll(path.Join(tmpDir, fmt.Sprintf("idx-%016x", time.Now().UnixNano()-10000)), 0o755)) - sic, err = newSeriesIndexController(ctx, opts) - assert.NoError(t, err) - assert.NotNil(t, sic) - idxNames = idxNames[:0] - walkDir(tmpDir, "idx-", func(suffix string) error { - idxNames = append(idxNames, suffix) - return nil - }) - assert.Equal(t, 2, len(idxNames)) - require.NoError(t, sic.Close()) - }) -} diff --git a/banyand/internal/storage/rotation.go b/banyand/internal/storage/rotation.go index 359e937d..9850dc62 100644 --- a/banyand/internal/storage/rotation.go +++ b/banyand/internal/storage/rotation.go @@ -51,35 +51,30 @@ func (d *database[T, O]) startRotationTask() error { defer d.rotationProcessOn.Store(false) t := time.Unix(0, ts) rt.run(t, d.logger) - shardsRef := d.sLst.Load() - if shardsRef == nil { - return - } - for _, s := range *shardsRef { - func(s *shard[T, O]) { - ss := s.segmentController.segments() - if len(ss) == 0 { - return + func() { + ss := d.segmentController.segments() + if len(ss) == 0 { + return + } + defer func() { + for i := 0; i < len(ss); i++ { + ss[i].DecRef() } - defer func() { - for i := 0; i < len(ss); i++ { - ss[i].DecRef() - } - }() - latest := ss[len(ss)-1] - gap := latest.End.UnixNano() - ts - // gap <=0 means the event is from the future - // the segment will be created by a written event directly - if gap <= 0 || gap > newSegmentTimeGap { - return - } - d.logger.Info().Time("segment_start", s.segmentController.segmentSize.nextTime(t)).Time("event_time", t).Msg("create new segment") - _, err := s.segmentController.create(s.segmentController.segmentSize.nextTime(t)) - if err != nil { - d.logger.Error().Err(err).Msgf("failed to create new segment.") - } - }(s) - } + }() + latest := ss[len(ss)-1] + gap := latest.End.UnixNano() - ts + // gap <=0 means the event is from the future + // the segment will be created by a written event directly + if gap <= 0 || gap > newSegmentTimeGap { + return + } + start := d.segmentController.opts.SegmentInterval.nextTime(t) + d.logger.Info().Time("segment_start", start).Time("event_time", t).Msg("create new segment") + _, err := d.segmentController.create(start) + if err != nil { + d.logger.Error().Err(err).Msgf("failed to create new segment.") + } + }() }(ts) } }(rt) @@ -115,19 +110,8 @@ func (rc *retentionTask[T, O]) run(now time.Time, l *logger.Logger) bool { <-rc.running }() - shardList := rc.database.sLst.Load() - if shardList == nil { - return false - } deadline := now.Add(-rc.duration) - - for _, shard := range *shardList { - if err := shard.segmentController.remove(deadline); err != nil { - l.Error().Err(err) - } - } - stdDeadline := rc.database.opts.TTL.Unit.standard(deadline) - if err := rc.database.indexController.run(now, stdDeadline); err != nil { + if err := rc.database.segmentController.remove(deadline); err != nil { l.Error().Err(err) } return true diff --git a/banyand/internal/storage/rotation_test.go b/banyand/internal/storage/rotation_test.go index 869e5f63..2c893b59 100644 --- a/banyand/internal/storage/rotation_test.go +++ b/banyand/internal/storage/rotation_test.go @@ -62,7 +62,6 @@ func TestRetention(t *testing.T) { tsdb, c, segCtrl, dfFn := setUpDB(t) defer dfFn() ts := c.Now() - indexHotStartTime := tsdb.indexController.hot.startTime for i := 0; i < 4; i++ { ts = ts.Add(23 * time.Hour) @@ -84,12 +83,6 @@ func TestRetention(t *testing.T) { assert.Eventually(t, func() bool { return len(segCtrl.segments()) == 4 }, flags.EventuallyTimeout, time.Millisecond, "wait for the 1st segment to be deleted") - assert.Eventually(t, func() bool { - tsdb.indexController.RLock() - defer tsdb.indexController.RUnlock() - ttl := tsdb.indexController.hot.startTime.Sub(indexHotStartTime) - return ttl >= 3*24*time.Hour - }, flags.EventuallyTimeout, time.Millisecond, "wait for the index to be updated") }) t.Run("keep the segment volume stable", func(t *testing.T) { @@ -131,15 +124,6 @@ func TestRetention(t *testing.T) { ct.Errorf("expect the segment number never to exceed 4, got %d", len(ss)) return } - tsdb.indexController.RLock() - indexStartTime := tsdb.indexController.hot.startTime - defer tsdb.indexController.RUnlock() - if ts.Sub(indexStartTime) > 3*24*time.Hour { - ct.Errorf("expect the index to be updated, current time %s, index start time %s", - ts.Format(time.RFC3339), indexStartTime.Format(time.RFC3339)) - return - } - t.Logf("current time: %s, index start time: %s", ts.Format(time.RFC3339), indexStartTime.Format(time.RFC3339)) if tsdb.rotationProcessOn.Load() { ct.Errorf("expect the rotation process to be off") } @@ -166,15 +150,13 @@ func setUpDB(t *testing.T) (*database[*MockTSTable, any], timestamp.MockClock, * tsdb, err := OpenTSDB(ctx, TSDBOpts) require.NoError(t, err) - tsTable, err := tsdb.CreateTSTableIfNotExist(0, ts) + seg, err := tsdb.CreateSegmentIfNotExist(ts) require.NoError(t, err) - tsTable.DecRef() + defer seg.DecRef() db := tsdb.(*database[*MockTSTable, any]) - shard, ok := db.getShard(0) - require.True(t, ok) - require.Equal(t, len(shard.segmentController.segments()), 1) - return db, mc, shard.segmentController, func() { + require.Equal(t, len(db.segmentController.segments()), 1) + return db, mc, db.segmentController, func() { tsdb.Close() defFn() } diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 8138244d..9153ce3e 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -40,46 +40,80 @@ import ( // ErrExpiredData is returned when the data is expired. var ErrExpiredData = errors.New("expired data") -type segment[T TSTable] struct { - tsTable T +type segment[T TSTable, O any] struct { l *logger.Logger + index *seriesIndex + sLst atomic.Pointer[[]*shard[T]] position common.Position timestamp.TimeRange - path string + location string suffix string + opts TSDBOpts[T, O] refCount int32 mustBeDeleted uint32 id segmentID } -func openSegment[T TSTable](ctx context.Context, startTime, endTime time.Time, path, suffix string, - segmentSize IntervalRule, tsTable T, p common.Position, -) (s *segment[T], err error) { +func (sc *segmentController[T, O]) openSegment(ctx context.Context, startTime, endTime time.Time, path, suffix string, + p common.Position, opts TSDBOpts[T, O], +) (s *segment[T, O], err error) { suffixInteger, err := strconv.Atoi(suffix) if err != nil { return nil, err } - id := generateSegID(segmentSize.Unit, suffixInteger) - timeRange := timestamp.NewSectionTimeRange(startTime, endTime) - s = &segment[T]{ + id := generateSegID(sc.opts.SegmentInterval.Unit, suffixInteger) + sir, err := newSeriesIndex(ctx, path, sc.opts.SeriesIndexFlushTimeoutSeconds) + if err != nil { + return nil, errors.Wrap(errOpenDatabase, errors.WithMessage(err, "create series index controller failed").Error()) + } + s = &segment[T, O]{ id: id, - path: path, + location: path, suffix: suffix, - TimeRange: timeRange, + TimeRange: timestamp.NewSectionTimeRange(startTime, endTime), position: p, - tsTable: tsTable, refCount: 1, + index: sir, + opts: opts, } - l := logger.Fetch(ctx, s.String()) - s.l = l - return s, nil + s.l = logger.Fetch(ctx, s.String()) + return s, s.loadShards() } -func (s *segment[T]) incRef() { +func (s *segment[T, O]) loadShards() error { + return walkDir(s.location, shardPathPrefix, func(suffix string) error { + shardID, err := strconv.Atoi(suffix) + if err != nil { + return err + } + if shardID >= int(s.opts.ShardNum) { + return nil + } + s.l.Info().Int("shard_id", shardID).Msg("loaded a existed shard") + _, err = s.CreateTSTableIfNotExist(common.ShardID(shardID)) + return err + }) +} + +func (s *segment[T, O]) GetTimeRange() timestamp.TimeRange { + return s.TimeRange +} + +func (s *segment[T, O]) Tables() (tt []T) { + sLst := s.sLst.Load() + if sLst != nil { + for _, s := range *sLst { + tt = append(tt, s.table) + } + } + return tt +} + +func (s *segment[T, O]) incRef() { atomic.AddInt32(&s.refCount, 1) } -func (s *segment[T]) DecRef() { +func (s *segment[T, O]) DecRef() { n := atomic.AddInt32(&s.refCount, -1) if n > 0 { return @@ -87,11 +121,18 @@ func (s *segment[T]) DecRef() { deletePath := "" if atomic.LoadUint32(&s.mustBeDeleted) != 0 { - deletePath = s.path + deletePath = s.location + } + + if err := s.index.Close(); err != nil { + s.l.Panic().Err(err).Msg("failed to close the series index") } - if err := s.tsTable.Close(); err != nil { - s.l.Panic().Err(err).Msg("failed to close tsTable") + sLst := s.sLst.Load() + if sLst != nil { + for _, s := range *sLst { + s.close() + } } if deletePath != "" { @@ -99,52 +140,75 @@ func (s *segment[T]) DecRef() { } } -func (s *segment[T]) Table() T { - return s.tsTable +func (s *segment[T, O]) delete() { + atomic.StoreUint32(&s.mustBeDeleted, 1) + s.DecRef() } -func (s *segment[T]) GetTimeRange() timestamp.TimeRange { - return s.TimeRange +func (s *segment[T, O]) CreateTSTableIfNotExist(id common.ShardID) (T, error) { + if s, ok := s.getShard(id); ok { + return s.table, nil + } + ctx := context.WithValue(context.Background(), logger.ContextKey, s.l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return s.position + }) + so, err := s.openShard(ctx, id) + if err != nil { + var t T + return t, err + } + var shardList []*shard[T] + sLst := s.sLst.Load() + if sLst != nil { + shardList = *sLst + } + shardList = append(shardList, so) + s.sLst.Store(&shardList) + return so.table, nil } -func (s *segment[T]) delete() { - atomic.StoreUint32(&s.mustBeDeleted, 1) - s.DecRef() +func (s *segment[T, O]) getShard(shardID common.ShardID) (*shard[T], bool) { + sLst := s.sLst.Load() + if sLst != nil { + for _, s := range *sLst { + if s.id == shardID { + return s, true + } + } + } + return nil, false } -func (s *segment[T]) String() string { +func (s *segment[T, O]) String() string { return "SegID-" + s.suffix } type segmentController[T TSTable, O any] struct { - clock timestamp.Clock - option O - l *logger.Logger - tsTableCreator TSTableCreator[T, O] - position common.Position - location string - lst []*segment[T] - segmentSize IntervalRule - deadline atomic.Int64 + clock timestamp.Clock + l *logger.Logger + position common.Position + location string + lst []*segment[T, O] + opts TSDBOpts[T, O] + deadline atomic.Int64 sync.RWMutex } func newSegmentController[T TSTable, O any](ctx context.Context, location string, - segmentSize IntervalRule, l *logger.Logger, tsTableCreator TSTableCreator[T, O], option O, + l *logger.Logger, opts TSDBOpts[T, O], ) *segmentController[T, O] { clock, _ := timestamp.GetClock(ctx) return &segmentController[T, O]{ - location: location, - segmentSize: segmentSize, - l: l, - clock: clock, - position: common.GetPosition(ctx), - tsTableCreator: tsTableCreator, - option: option, + location: location, + opts: opts, + l: l, + clock: clock, + position: common.GetPosition(ctx), } } -func (sc *segmentController[T, O]) selectTSTables(timeRange timestamp.TimeRange) (tt []TSTableWrapper[T]) { +func (sc *segmentController[T, O]) selectSegments(timeRange timestamp.TimeRange) (tt []Segment[T, O]) { sc.RLock() defer sc.RUnlock() last := len(sc.lst) - 1 @@ -158,7 +222,7 @@ func (sc *segmentController[T, O]) selectTSTables(timeRange timestamp.TimeRange) return tt } -func (sc *segmentController[T, O]) createTSTable(ts time.Time) (TSTableWrapper[T], error) { +func (sc *segmentController[T, O]) createSegment(ts time.Time) (*segment[T, O], error) { // Before the first remove old segment run, any segment should be created. if sc.deadline.Load() > ts.UnixNano() { return nil, ErrExpiredData @@ -171,10 +235,10 @@ func (sc *segmentController[T, O]) createTSTable(ts time.Time) (TSTableWrapper[T return s, nil } -func (sc *segmentController[T, O]) segments() (ss []*segment[T]) { +func (sc *segmentController[T, O]) segments() (ss []*segment[T, O]) { sc.RLock() defer sc.RUnlock() - r := make([]*segment[T], len(sc.lst)) + r := make([]*segment[T, O], len(sc.lst)) for i := range sc.lst { sc.lst[i].incRef() r[i] = sc.lst[i] @@ -182,8 +246,8 @@ func (sc *segmentController[T, O]) segments() (ss []*segment[T]) { return r } -func (sc *segmentController[T, O]) Format(tm time.Time) string { - switch sc.segmentSize.Unit { +func (sc *segmentController[T, O]) format(tm time.Time) string { + switch sc.opts.SegmentInterval.Unit { case HOUR: return tm.Format(hourFormat) case DAY: @@ -192,8 +256,8 @@ func (sc *segmentController[T, O]) Format(tm time.Time) string { panic("invalid interval unit") } -func (sc *segmentController[T, O]) Parse(value string) (time.Time, error) { - switch sc.segmentSize.Unit { +func (sc *segmentController[T, O]) parse(value string) (time.Time, error) { + switch sc.opts.SegmentInterval.Unit { case HOUR: return time.ParseInLocation(hourFormat, value, time.Local) case DAY: @@ -206,8 +270,8 @@ func (sc *segmentController[T, O]) open() error { sc.Lock() defer sc.Unlock() emptySegments := make([]string, 0) - err := loadSegments(sc.location, segPathPrefix, sc, sc.segmentSize, func(start, end time.Time) error { - suffix := sc.Format(start) + err := loadSegments(sc.location, segPathPrefix, sc, sc.opts.SegmentInterval, func(start, end time.Time) error { + suffix := sc.format(start) segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, suffix)) metadataPath := path.Join(segmentPath, metadataFilename) version, err := lfs.Read(metadataPath) @@ -237,7 +301,7 @@ func (sc *segmentController[T, O]) open() error { return err } -func (sc *segmentController[T, O]) create(start time.Time) (*segment[T], error) { +func (sc *segmentController[T, O]) create(start time.Time) (*segment[T, O], error) { sc.Lock() defer sc.Unlock() last := len(sc.lst) - 1 @@ -247,8 +311,8 @@ func (sc *segmentController[T, O]) create(start time.Time) (*segment[T], error) return s, nil } } - start = sc.segmentSize.Unit.standard(start) - var next *segment[T] + start = sc.opts.SegmentInterval.Unit.standard(start) + var next *segment[T, O] for _, s := range sc.lst { if s.Contains(start.UnixNano()) { return s, nil @@ -257,14 +321,14 @@ func (sc *segmentController[T, O]) create(start time.Time) (*segment[T], error) next = s } } - stdEnd := sc.segmentSize.nextTime(start) + stdEnd := sc.opts.SegmentInterval.nextTime(start) var end time.Time if next != nil && next.Start.Before(stdEnd) { end = next.Start } else { end = stdEnd } - segPath := path.Join(sc.location, fmt.Sprintf(segTemplate, sc.Format(start))) + segPath := path.Join(sc.location, fmt.Sprintf(segTemplate, sc.format(start))) lfs.MkdirPanicIfExist(segPath, dirPerm) data := []byte(currentVersion) metadataPath := filepath.Join(segPath, metadataFilename) @@ -288,16 +352,12 @@ func (sc *segmentController[T, O]) sortLst() { }) } -func (sc *segmentController[T, O]) load(start, end time.Time, root string) (seg *segment[T], err error) { - suffix := sc.Format(start) +func (sc *segmentController[T, O]) load(start, end time.Time, root string) (seg *segment[T, O], err error) { + suffix := sc.format(start) segPath := path.Join(root, fmt.Sprintf(segTemplate, suffix)) - var tsTable T p := sc.position p.Segment = suffix - if tsTable, err = sc.tsTableCreator(lfs, segPath, p, sc.l, timestamp.NewSectionTimeRange(start, end), sc.option); err != nil { - return nil, err - } - seg, err = openSegment[T](context.WithValue(context.Background(), logger.ContextKey, sc.l), start, end, segPath, suffix, sc.segmentSize, tsTable, p) + seg, err = sc.openSegment(context.WithValue(context.Background(), logger.ContextKey, sc.l), start, end, segPath, suffix, p, sc.opts) if err != nil { return nil, err } @@ -343,17 +403,13 @@ func (sc *segmentController[T, O]) close() { sc.lst = sc.lst[:0] } -type parser interface { - Parse(value string) (time.Time, error) -} - -func loadSegments(root, prefix string, parser parser, intervalRule IntervalRule, loadFn func(start, end time.Time) error) error { +func loadSegments[T TSTable, O any](root, prefix string, parser *segmentController[T, O], intervalRule IntervalRule, loadFn func(start, end time.Time) error) error { var startTimeLst []time.Time if err := walkDir( root, prefix, func(suffix string) error { - startTime, err := parser.Parse(suffix) + startTime, err := parser.parse(suffix) if err != nil { return err } diff --git a/banyand/internal/storage/shard.go b/banyand/internal/storage/shard.go index d4e32023..a3dc8e6a 100644 --- a/banyand/internal/storage/shard.go +++ b/banyand/internal/storage/shard.go @@ -22,48 +22,45 @@ import ( "fmt" "path" "strconv" - "sync" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) -type shard[T TSTable, O any] struct { - l *logger.Logger - segmentController *segmentController[T, O] - position common.Position - closeOnce sync.Once - id common.ShardID +type shard[T TSTable] struct { + table T + l *logger.Logger + timeRange timestamp.TimeRange + location string + id common.ShardID } -func (d *database[T, O]) openShard(ctx context.Context, id common.ShardID) (*shard[T, O], error) { - location := path.Join(d.location, fmt.Sprintf(shardTemplate, int(id))) +func (s *segment[T, O]) openShard(ctx context.Context, id common.ShardID) (*shard[T], error) { + location := path.Join(s.location, fmt.Sprintf(shardTemplate, int(id))) lfs.MkdirIfNotExist(location, dirPerm) l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id))) l.Info().Int("shard_id", int(id)).Str("path", location).Msg("creating a shard") - shardCtx := context.WithValue(ctx, logger.ContextKey, l) - shardCtx = common.SetPosition(shardCtx, func(p common.Position) common.Position { - p.Shard = strconv.Itoa(int(id)) - return p - }) - - s := &shard[T, O]{ - id: id, - l: l, - position: common.GetPosition(shardCtx), - segmentController: newSegmentController[T](shardCtx, location, - d.opts.SegmentInterval, l, - d.opts.TSTableCreator, d.opts.Option), - } - var err error - if err = s.segmentController.open(); err != nil { + p := common.GetPosition(ctx) + p.Shard = strconv.Itoa(int(id)) + t, err := s.opts.TSTableCreator(lfs, location, p, l, s.TimeRange, s.opts.Option) + if err != nil { return nil, err } - return s, nil + + return &shard[T]{ + id: id, + l: l, + table: t, + timeRange: s.TimeRange, + location: location, + }, nil +} + +func (s *shard[T]) Table() T { + return s.table } -func (s *shard[T, O]) close() { - s.closeOnce.Do(func() { - s.segmentController.close() - }) +func (s *shard[T]) close() error { + return s.table.Close() } diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index 935dbe0e..7ab4dc3c 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -72,11 +72,19 @@ type IndexDB interface { // TSDB allows listing and getting shard details. type TSDB[T TSTable, O any] interface { io.Closer + CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error) + SelectSegments(timeRange timestamp.TimeRange) []Segment[T, O] + Tick(ts int64) +} + +// Segment is a time range of data. +type Segment[T TSTable, O any] interface { + DecRef() + GetTimeRange() timestamp.TimeRange + CreateTSTableIfNotExist(shardID common.ShardID) (T, error) + Tables() []T Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) - CreateTSTableIfNotExist(shardID common.ShardID, ts time.Time) (TSTableWrapper[T], error) - SelectTSTables(timeRange timestamp.TimeRange) []TSTableWrapper[T] IndexDB() IndexDB - Tick(ts int64) } // TSTable is time series table. @@ -84,14 +92,6 @@ type TSTable interface { io.Closer } -// TSTableWrapper is a wrapper of TSTable. -// It is used to manage the reference count of TSTable. -type TSTableWrapper[T TSTable] interface { - DecRef() - Table() T - GetTimeRange() timestamp.TimeRange -} - // TSTableCreator creates a TSTable. type TSTableCreator[T TSTable, O any] func(fileSystem fs.FileSystem, root string, position common.Position, l *logger.Logger, timeRange timestamp.TimeRange, option O) (T, error) diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go index ba53b107..39ed5ba3 100644 --- a/banyand/internal/storage/tsdb.go +++ b/banyand/internal/storage/tsdb.go @@ -20,7 +20,6 @@ package storage import ( "context" "path/filepath" - "strconv" "strings" "sync" "sync/atomic" @@ -68,16 +67,15 @@ func generateSegID(unit IntervalUnit, suffix int) segmentID { } type database[T TSTable, O any] struct { - lock fs.File - logger *logger.Logger - indexController *seriesIndexController[T, O] - scheduler *timestamp.Scheduler - sLst atomic.Pointer[[]*shard[T, O]] - tsEventCh chan int64 - p common.Position - location string - opts TSDBOpts[T, O] - latestTickTime atomic.Int64 + lock fs.File + logger *logger.Logger + scheduler *timestamp.Scheduler + tsEventCh chan int64 + segmentController *segmentController[T, O] + p common.Position + location string + opts TSDBOpts[T, O] + latestTickTime atomic.Int64 sync.RWMutex rotationProcessOn atomic.Bool } @@ -86,18 +84,13 @@ func (d *database[T, O]) Close() error { d.Lock() defer d.Unlock() d.scheduler.Close() - sLst := d.sLst.Load() - if sLst != nil { - for _, s := range *sLst { - s.close() - } - } close(d.tsEventCh) + d.segmentController.close() d.lock.Close() if err := lfs.DeleteFile(d.lock.Path()); err != nil { logger.Panicf("cannot delete lock file %s: %s", d.lock.Path(), err) } - return d.indexController.Close() + return nil } // OpenTSDB returns a new tsdb runtime. This constructor will create a new database if it's absent, @@ -112,21 +105,18 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O]) (TSDB[ p := common.GetPosition(ctx) location := filepath.Clean(opts.Location) lfs.MkdirIfNotExist(location, dirPerm) - sir, err := newSeriesIndexController(ctx, opts) - if err != nil { - return nil, errors.Wrap(errOpenDatabase, errors.WithMessage(err, "create series index controller failed").Error()) - } l := logger.Fetch(ctx, p.Database) clock, _ := timestamp.GetClock(ctx) scheduler := timestamp.NewScheduler(l, clock) db := &database[T, O]{ - location: location, - scheduler: scheduler, - logger: l, - indexController: sir, - opts: opts, - tsEventCh: make(chan int64), - p: p, + location: location, + scheduler: scheduler, + logger: l, + opts: opts, + tsEventCh: make(chan int64), + p: p, + segmentController: newSegmentController[T](ctx, location, + l, opts), } db.logger.Info().Str("path", opts.Location).Msg("initialized") lockPath := filepath.Join(opts.Location, lockFilename) @@ -135,101 +125,18 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O]) (TSDB[ logger.Panicf("cannot create lock file %s: %s", lockPath, err) } db.lock = lock - if err = db.loadDatabase(); err != nil { - return nil, errors.Wrap(errOpenDatabase, errors.WithMessage(err, "load database failed").Error()) - } - return db, db.startRotationTask() -} - -func (d *database[T, O]) CreateTSTableIfNotExist(shardID common.ShardID, ts time.Time) (TSTableWrapper[T], error) { - if s, ok := d.getShard(shardID); ok { - d.RLock() - defer d.RUnlock() - return d.createTSTTable(s, ts) - } - d.Lock() - defer d.Unlock() - if s, ok := d.getShard(shardID); ok { - return d.createTSTTable(s, ts) - } - d.logger.Info().Int("shard_id", int(shardID)).Msg("creating a shard") - s, err := d.registerShard(shardID) - if err != nil { + if err := db.segmentController.open(); err != nil { return nil, err } - return d.createTSTTable(s, ts) -} - -func (d *database[T, O]) getShard(shardID common.ShardID) (*shard[T, O], bool) { - sLst := d.sLst.Load() - if sLst != nil { - for _, s := range *sLst { - if s.id == shardID { - return s, true - } - } - } - return nil, false + return db, db.startRotationTask() } -func (d *database[T, O]) createTSTTable(shard *shard[T, O], ts time.Time) (TSTableWrapper[T], error) { - timeRange := timestamp.NewInclusiveTimeRange(ts, ts) - ss := shard.segmentController.selectTSTables(timeRange) - if len(ss) > 0 { - return ss[0], nil - } - return shard.segmentController.createTSTable(ts) +func (d *database[T, O]) CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error) { + return d.segmentController.createSegment(ts) } -func (d *database[T, O]) SelectTSTables(timeRange timestamp.TimeRange) []TSTableWrapper[T] { - var result []TSTableWrapper[T] - sLst := d.sLst.Load() - if sLst == nil { - return result - } - for _, s := range *sLst { - result = append(result, s.segmentController.selectTSTables(timeRange)...) - } - return result -} - -func (d *database[T, O]) registerShard(id common.ShardID) (*shard[T, O], error) { - if s, ok := d.getShard(id); ok { - return s, nil - } - ctx := context.WithValue(context.Background(), logger.ContextKey, d.logger) - ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { - return d.p - }) - so, err := d.openShard(ctx, id) - if err != nil { - return nil, err - } - var shardList []*shard[T, O] - sLst := d.sLst.Load() - if sLst != nil { - shardList = *sLst - } - shardList = append(shardList, so) - d.sLst.Store(&shardList) - return so, nil -} - -func (d *database[T, O]) loadDatabase() error { - d.Lock() - defer d.Unlock() - return walkDir(d.location, shardPathPrefix, func(suffix string) error { - shardID, err := strconv.Atoi(suffix) - if err != nil { - return err - } - if shardID >= int(d.opts.ShardNum) { - return nil - } - d.logger.Info().Int("shard_id", shardID).Msg("loaded a existed shard") - _, err = d.registerShard(common.ShardID(shardID)) - return err - }) +func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange) []Segment[T, O] { + return d.segmentController.selectSegments(timeRange) } type walkFn func(suffix string) error diff --git a/banyand/measure/datapoints.go b/banyand/measure/datapoints.go index 3996e3a0..03b07093 100644 --- a/banyand/measure/datapoints.go +++ b/banyand/measure/datapoints.go @@ -144,15 +144,15 @@ func (d *dataPoints) Swap(i, j int) { type dataPointsInTable struct { timeRange timestamp.TimeRange - tsTable storage.TSTableWrapper[*tsTable] + tsTable *tsTable dataPoints dataPoints } type dataPointsInGroup struct { - tsdb storage.TSDB[*tsTable, option] - + tsdb storage.TSDB[*tsTable, option] docs index.Documents tables []*dataPointsInTable + segments []storage.Segment[*tsTable, option] latestTS int64 } diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 785d772e..f2a8c2ba 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -74,13 +74,7 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (mqr if db == nil { return mqr, nil } - tsdb := db.(storage.TSDB[*tsTable, option]) - tabWrappers := tsdb.SelectTSTables(*mqo.TimeRange) - defer func() { - for i := range tabWrappers { - tabWrappers[i].DecRef() - } - }() + series := make([]*pbv1.Series, len(mqo.Entities)) for i := range mqo.Entities { series[i] = &pbv1.Series{ @@ -88,13 +82,24 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (mqr EntityValues: mqo.Entities[i], } } + var result queryResult + tsdb := db.(storage.TSDB[*tsTable, option]) + result.segments = tsdb.SelectSegments(*mqo.TimeRange) + if len(result.segments) < 1 { + return &result, nil + } + defer func() { + if err != nil { + result.Release() + } + }() - sl, err := tsdb.IndexDB().Search(ctx, series, mqo.Filter, mqo.Order, preloadSize) + sl, tables, err := s.searchSeriesList(ctx, series, mqo, result.segments) if err != nil { return nil, err } if len(sl) < 1 { - return mqr, nil + return &result, nil } var sids []common.SeriesID for i := range sl { @@ -107,9 +112,8 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (mqr maxTimestamp: mqo.TimeRange.End.UnixNano(), } var n int - var result queryResult - for i := range tabWrappers { - s := tabWrappers[i].Table().currentSnapshot() + for i := range tables { + s := tables[i].currentSnapshot() if s == nil { continue } @@ -121,54 +125,7 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (mqr result.snapshots = append(result.snapshots, s) } - func() { - bma := generateBlockMetadataArray() - defer releaseBlockMetadataArray(bma) - defFn := startBlockScanSpan(ctx, len(sids), parts, &result) - defer defFn() - // TODO: cache tstIter - var tstIter tstIter - defer tstIter.reset() - originalSids := make([]common.SeriesID, len(sids)) - copy(originalSids, sids) - sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) - tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) - if tstIter.Error() != nil { - err = fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) - return - } - projectedEntityOffsets, tagProjectionOnPart := s.parseTagProjection(qo, &result) - result.tagProjection = qo.TagProjection - qo.TagProjection = tagProjectionOnPart - - for tstIter.nextBlock() { - bc := generateBlockCursor() - p := tstIter.piHeap[0] - - seriesID := p.curBlock.seriesID - if result.entityValues != nil && result.entityValues[seriesID] == nil { - for i := range sl { - if sl[i].ID == seriesID { - tag := make(map[string]*modelv1.TagValue) - for name, offset := range projectedEntityOffsets { - tag[name] = sl[i].EntityValues[offset] - } - result.entityValues[seriesID] = tag - } - } - } - bc.init(p.p, p.curBlock, qo) - result.data = append(result.data, bc) - } - if tstIter.Error() != nil { - err = fmt.Errorf("cannot iterate tstIter: %w", tstIter.Error()) - } - result.sidToIndex = make(map[common.SeriesID]int) - for i, si := range originalSids { - result.sidToIndex[si] = i - } - }() - if err != nil { + if err = s.searchBlocks(ctx, &result, sl, sids, parts, qo); err != nil { return nil, err } @@ -188,6 +145,68 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (mqr return &result, nil } +func (s *measure) searchSeriesList(ctx context.Context, series []*pbv1.Series, mqo pbv1.MeasureQueryOptions, + segments []storage.Segment[*tsTable, option], +) (sl pbv1.SeriesList, tables []*tsTable, err error) { + for i := range segments { + tables = append(tables, segments[i].Tables()...) + sll, err := segments[i].IndexDB().Search(ctx, series, mqo.Filter, mqo.Order, preloadSize) + if err != nil { + return nil, nil, err + } + sl = append(sl, sll...) + } + return sl, tables, nil +} + +func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sl pbv1.SeriesList, sids []common.SeriesID, parts []*part, qo queryOptions) error { + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) + defFn := startBlockScanSpan(ctx, len(sids), parts, result) + defer defFn() + // TODO: cache tstIter + var tstIter tstIter + defer tstIter.reset() + originalSids := make([]common.SeriesID, len(sids)) + copy(originalSids, sids) + sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) + tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) + if tstIter.Error() != nil { + return fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) + } + projectedEntityOffsets, tagProjectionOnPart := s.parseTagProjection(qo, result) + result.tagProjection = qo.TagProjection + qo.TagProjection = tagProjectionOnPart + + for tstIter.nextBlock() { + bc := generateBlockCursor() + p := tstIter.piHeap[0] + + seriesID := p.curBlock.seriesID + if result.entityValues != nil && result.entityValues[seriesID] == nil { + for i := range sl { + if sl[i].ID == seriesID { + tag := make(map[string]*modelv1.TagValue) + for name, offset := range projectedEntityOffsets { + tag[name] = sl[i].EntityValues[offset] + } + result.entityValues[seriesID] = tag + } + } + } + bc.init(p.p, p.curBlock, qo) + result.data = append(result.data, bc) + } + if tstIter.Error() != nil { + return fmt.Errorf("cannot iterate tstIter: %w", tstIter.Error()) + } + result.sidToIndex = make(map[common.SeriesID]int) + for i, si := range originalSids { + result.sidToIndex[si] = i + } + return nil +} + func (s *measure) parseTagProjection(qo queryOptions, result *queryResult) (projectedEntityOffsets map[string]int, tagProjectionOnPart []pbv1.TagProjection) { projectedEntityOffsets = make(map[string]int) for i := range qo.TagProjection { @@ -373,6 +392,7 @@ type queryResult struct { tagProjection []pbv1.TagProjection data []*blockCursor snapshots []*snapshot + segments []storage.Segment[*tsTable, option] loaded bool orderByTS bool ascTS bool @@ -440,6 +460,9 @@ func (qr *queryResult) Release() { qr.snapshots[i].decRef() } qr.snapshots = qr.snapshots[:0] + for i := range qr.segments { + qr.segments[i].DecRef() + } } func (qr queryResult) Len() int { diff --git a/banyand/measure/write.go b/banyand/measure/write.go index 4c50c47f..5b2e695d 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -28,6 +28,7 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" @@ -64,8 +65,9 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me dpg, ok := dst[gn] if !ok { dpg = &dataPointsInGroup{ - tsdb: tsdb, - tables: make([]*dataPointsInTable, 0), + tsdb: tsdb, + tables: make([]*dataPointsInTable, 0), + segments: make([]storage.Segment[*tsTable, option], 0), } dst[gn] = dpg } @@ -82,15 +84,9 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me } shardID := common.ShardID(writeEvent.ShardId) if dpt == nil { - tstb, err := tsdb.CreateTSTableIfNotExist(shardID, t) - if err != nil { - return nil, fmt.Errorf("cannot create ts table: %w", err) - } - dpt = &dataPointsInTable{ - timeRange: tstb.GetTimeRange(), - tsTable: tstb, + if dpt, err = w.newDpt(tsdb, dpg, t, ts, shardID); err != nil { + return nil, fmt.Errorf("cannot create data points in table: %w", err) } - dpg.tables = append(dpg.tables, dpt) } dpt.dataPoints.timestamps = append(dpt.dataPoints.timestamps, ts) dpt.dataPoints.versions = append(dpt.dataPoints.versions, req.DataPoint.Version) @@ -211,6 +207,33 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me return dst, nil } +func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPointsInGroup, t time.Time, ts int64, shardID common.ShardID) (*dataPointsInTable, error) { + var segment storage.Segment[*tsTable, option] + for _, seg := range dpg.segments { + if seg.GetTimeRange().Contains(ts) { + segment = seg + } + } + if segment == nil { + var err error + segment, err = tsdb.CreateSegmentIfNotExist(t) + if err != nil { + return nil, fmt.Errorf("cannot create segment: %w", err) + } + dpg.segments = append(dpg.segments, segment) + } + tstb, err := segment.CreateTSTableIfNotExist(shardID) + if err != nil { + return nil, fmt.Errorf("cannot create ts table: %w", err) + } + dpt := &dataPointsInTable{ + timeRange: segment.GetTimeRange(), + tsTable: tstb, + } + dpg.tables = append(dpg.tables, dpt) + return dpt, nil +} + func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { events, ok := message.Data().([]any) if !ok { @@ -246,15 +269,17 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { } for i := range groups { g := groups[i] - g.tsdb.Tick(g.latestTS) for j := range g.tables { dps := g.tables[j] - dps.tsTable.Table().mustAddDataPoints(&dps.dataPoints) - dps.tsTable.DecRef() + dps.tsTable.mustAddDataPoints(&dps.dataPoints) } - if err := g.tsdb.IndexDB().Write(g.docs); err != nil { - w.l.Error().Err(err).Msg("cannot write index") + for _, segment := range g.segments { + if err := segment.IndexDB().Write(g.docs); err != nil { + w.l.Error().Err(err).Msg("cannot write index") + } + segment.DecRef() } + g.tsdb.Tick(g.latestTS) } return } diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go index f2fb7c34..633593da 100644 --- a/banyand/stream/benchmark_test.go +++ b/banyand/stream/benchmark_test.go @@ -23,7 +23,6 @@ import ( "io" "math" "math/big" - "path/filepath" "strconv" "sync/atomic" "testing" @@ -37,11 +36,9 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/convert" - "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" - "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" "github.com/apache/skywalking-banyandb/pkg/test" @@ -195,41 +192,8 @@ func openDatabase(b *testing.B, path string) storage.TSDB[*tsTable, option] { func write(b *testing.B, p parameter, esList []*elements, docsList []index.Documents) storage.TSDB[*tsTable, option] { // Initialize a tstIter object. tmpPath, defFn := test.Space(require.New(b)) - segmentPath := filepath.Join(tmpPath, "shard-0", "seg-19700101") - fileSystem := fs.NewLocalFileSystem() defer defFn() - tst, err := newTSTable(fileSystem, segmentPath, common.Position{}, - // Since Stream deduplicate data in merging process, we need to disable the merging in the test. - logger.GetLogger("benchmark"), timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: newDisabledMergePolicyForTesting()}) - require.NoError(b, err) - for i := 0; i < len(esList); i++ { - tst.mustAddElements(esList[i]) - tst.index.Write(docsList[i]) - time.Sleep(100 * time.Millisecond) - } - // wait until the introducer is done - if len(esList) > 0 { - for { - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if snp.creator != snapshotCreatorMemPart && len(snp.parts) == len(esList) { - snp.decRef() - tst.Close() - break - } - snp.decRef() - time.Sleep(100 * time.Millisecond) - } - } - data := []byte(version) - metadataPath := filepath.Join(segmentPath, segmentMetadataFilename) - lf, err := fileSystem.CreateLockFile(metadataPath, filePermission) - require.NoError(b, err) - _, err = lf.Write(data) - require.NoError(b, err) + db := openDatabase(b, tmpPath) var docs index.Documents for i := 1; i <= p.seriesCount; i++ { @@ -246,13 +210,22 @@ func write(b *testing.B, p parameter, esList []*elements, docsList []index.Docum Subject: "benchmark", EntityValues: entity, } - err = series.Marshal() + err := series.Marshal() require.NoError(b, err) docs = append(docs, index.Document{ DocID: uint64(i), EntityValues: series.Buffer, }) - db.IndexDB().Write(docs) + } + seg, err := db.CreateSegmentIfNotExist(time.Unix(0, esList[0].timestamps[0])) + require.NoError(b, err) + seg.IndexDB().Write(docs) + + tst, err := seg.CreateTSTableIfNotExist(common.ShardID(0)) + require.NoError(b, err) + for i := range esList { + tst.mustAddElements(esList[i]) + tst.Index().Write(docsList[i]) } return db } diff --git a/banyand/stream/elements.go b/banyand/stream/elements.go index 075a3b67..a45a1240 100644 --- a/banyand/stream/elements.go +++ b/banyand/stream/elements.go @@ -142,15 +142,17 @@ func (e *elements) Swap(i, j int) { type elementsInTable struct { timeRange timestamp.TimeRange - tsTable storage.TSTableWrapper[*tsTable] + tsTable *tsTable elements elements - docs index.Documents + + docs index.Documents } type elementsInGroup struct { tsdb storage.TSDB[*tsTable, option] docs index.Documents tables []*elementsInTable + segments []storage.Segment[*tsTable, option] latestTS int64 } diff --git a/banyand/stream/query.go b/banyand/stream/query.go index ef7925b9..e50d7b84 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -53,12 +53,16 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (sqr pb } var result queryResult tsdb := db.(storage.TSDB[*tsTable, option]) - result.tabWrappers = tsdb.SelectTSTables(*sqo.TimeRange) + result.segments = tsdb.SelectSegments(*sqo.TimeRange) + if len(result.segments) < 1 { + return &result, nil + } defer func() { - if sqr == nil { + if err != nil { result.Release() } }() + var tables []*tsTable series := make([]*pbv1.Series, len(sqo.Entities)) for i := range sqo.Entities { @@ -67,12 +71,19 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (sqr pb EntityValues: sqo.Entities[i], } } - seriesList, err := tsdb.Lookup(ctx, series) - if err != nil { - return nil, err + var seriesList, sl pbv1.SeriesList + for i := range result.segments { + tables = append(tables, result.segments[i].Tables()...) + sl, err = result.segments[i].Lookup(ctx, series) + if err != nil { + return nil, err + } + seriesList = append(seriesList, sl...) + result.tabs = append(result.tabs, tables...) } + if len(seriesList) == 0 { - return sqr, nil + return &result, nil } result.qo = queryOptions{ StreamQueryOptions: sqo, @@ -84,7 +95,7 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (sqr pb result.qo.seriesToEntity[seriesList[i].ID] = seriesList[i].EntityValues result.qo.sortedSids = append(result.qo.sortedSids, seriesList[i].ID) } - if result.qo.elementFilter, err = indexSearch(sqo, result.tabWrappers, seriesList); err != nil { + if result.qo.elementFilter, err = indexSearch(sqo, result.tabs, seriesList); err != nil { return nil, err } result.tagNameIndex = make(map[string]partition.TagLocator) @@ -105,7 +116,7 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (sqr pb if sqo.Order.Index == nil { result.orderByTS = true - } else if result.sortingIter, err = s.indexSort(sqo, result.tabWrappers, seriesList); err != nil { + } else if result.sortingIter, err = s.indexSort(sqo, result.tabs, seriesList); err != nil { return nil, err } if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { @@ -127,10 +138,11 @@ type queryResult struct { sortingIter itersort.Iterator[*index.ItemRef] tagNameIndex map[string]partition.TagLocator schema *databasev1.Stream - tabWrappers []storage.TSTableWrapper[*tsTable] + tabs []*tsTable elementIDsSorted []uint64 data []*blockCursor snapshots []*snapshot + segments []storage.Segment[*tsTable, option] qo queryOptions loaded bool orderByTS bool @@ -157,8 +169,8 @@ func (qr *queryResult) Pull(ctx context.Context) *pbv1.StreamResult { func (qr *queryResult) scanParts(ctx context.Context, qo queryOptions) error { var parts []*part var n int - for i := range qr.tabWrappers { - s := qr.tabWrappers[i].Table().currentSnapshot() + for i := range qr.tabs { + s := qr.tabs[i].currentSnapshot() if s == nil { continue } @@ -380,8 +392,8 @@ func (qr *queryResult) releaseBlockCursor() { func (qr *queryResult) Release() { qr.releaseParts() - for i := range qr.tabWrappers { - qr.tabWrappers[i].DecRef() + for i := range qr.segments { + qr.segments[i].DecRef() } } @@ -504,14 +516,14 @@ func (qr *queryResult) mergeByTimestamp() *pbv1.StreamResult { } func indexSearch(sqo pbv1.StreamQueryOptions, - tabWrappers []storage.TSTableWrapper[*tsTable], seriesList pbv1.SeriesList, + tabs []*tsTable, seriesList pbv1.SeriesList, ) (posting.List, error) { if sqo.Filter == nil || sqo.Filter == logical.ENode { return nil, nil } result := roaring.NewPostingList() - for _, tw := range tabWrappers { - index := tw.Table().Index() + for _, tw := range tabs { + index := tw.Index() pl, err := index.Search(seriesList, sqo.Filter) if err != nil { return nil, err @@ -526,13 +538,13 @@ func indexSearch(sqo pbv1.StreamQueryOptions, return result, nil } -func (s *stream) indexSort(sqo pbv1.StreamQueryOptions, tabWrappers []storage.TSTableWrapper[*tsTable], +func (s *stream) indexSort(sqo pbv1.StreamQueryOptions, tabs []*tsTable, seriesList pbv1.SeriesList, ) (itersort.Iterator[*index.ItemRef], error) { if sqo.Order == nil || sqo.Order.Index == nil { return nil, nil } - iters, err := s.buildItersByIndex(tabWrappers, seriesList, sqo) + iters, err := s.buildItersByIndex(tabs, seriesList, sqo) if err != nil { return nil, err } @@ -540,7 +552,7 @@ func (s *stream) indexSort(sqo pbv1.StreamQueryOptions, tabWrappers []storage.TS return itersort.NewItemIter[*index.ItemRef](iters, desc), nil } -func (s *stream) buildItersByIndex(tableWrappers []storage.TSTableWrapper[*tsTable], +func (s *stream) buildItersByIndex(tables []*tsTable, seriesList pbv1.SeriesList, sqo pbv1.StreamQueryOptions, ) (iters []itersort.Iterator[*index.ItemRef], err error) { indexRuleForSorting := sqo.Order.Index @@ -548,13 +560,13 @@ func (s *stream) buildItersByIndex(tableWrappers []storage.TSTableWrapper[*tsTab return nil, fmt.Errorf("only support one tag for sorting, but got %d", len(indexRuleForSorting.Tags)) } sids := seriesList.IDs() - for _, tw := range tableWrappers { + for _, tw := range tables { var iter index.FieldIterator[*index.ItemRef] fieldKey := index.FieldKey{ IndexRuleID: indexRuleForSorting.GetMetadata().GetId(), Analyzer: indexRuleForSorting.GetAnalyzer(), } - iter, err = tw.Table().Index().Sort(sids, fieldKey, sqo.Order.Sort, sqo.TimeRange, sqo.MaxElementSize) + iter, err = tw.Index().Sort(sids, fieldKey, sqo.Order.Sort, sqo.TimeRange, sqo.MaxElementSize) if err != nil { return nil, err } diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go index 8cc5236d..5489884d 100644 --- a/banyand/stream/query_test.go +++ b/banyand/stream/query_test.go @@ -30,7 +30,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" @@ -308,7 +307,7 @@ func TestQueryResult(t *testing.T) { TagProjection: tagProjectionAll, }, } - result.tabWrappers = []storage.TSTableWrapper[*tsTable]{&tsTableWrapper{tst}} + result.tabs = []*tsTable{tst} defer result.Release() if !tt.orderBySeries { result.orderByTS = true @@ -401,22 +400,6 @@ func TestQueryResult(t *testing.T) { } } -type tsTableWrapper struct { - *tsTable -} - -func (t *tsTableWrapper) Table() *tsTable { - return t.tsTable -} - -func (t *tsTableWrapper) GetTimeRange() timestamp.TimeRange { - return timestamp.TimeRange{} -} - -func (t *tsTableWrapper) DecRef() { - t.tsTable.Close() -} - func emptyTagFamilies(size int) []pbv1.TagFamily { var values []*modelv1.TagValue for i := 0; i < size; i++ { diff --git a/banyand/stream/write.go b/banyand/stream/write.go index bcb1c3f9..2ba742ba 100644 --- a/banyand/stream/write.go +++ b/banyand/stream/write.go @@ -28,6 +28,7 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" @@ -66,8 +67,9 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre eg, ok := dst[gn] if !ok { eg = &elementsInGroup{ - tsdb: tsdb, - tables: make([]*elementsInTable, 0), + tsdb: tsdb, + tables: make([]*elementsInTable, 0), + segments: make([]storage.Segment[*tsTable, option], 0), } dst[gn] = eg } @@ -84,13 +86,26 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre } shardID := common.ShardID(writeEvent.ShardId) if et == nil { - tsdb, err := tsdb.CreateTSTableIfNotExist(shardID, t) + var segment storage.Segment[*tsTable, option] + for _, seg := range eg.segments { + if seg.GetTimeRange().Contains(ts) { + segment = seg + } + } + if segment == nil { + segment, err = tsdb.CreateSegmentIfNotExist(t) + if err != nil { + return nil, fmt.Errorf("cannot create segment: %w", err) + } + eg.segments = append(eg.segments, segment) + } + tstb, err := segment.CreateTSTableIfNotExist(shardID) if err != nil { return nil, fmt.Errorf("cannot create ts table: %w", err) } et = &elementsInTable{ - timeRange: tsdb.GetTimeRange(), - tsTable: tsdb, + timeRange: segment.GetTimeRange(), + tsTable: tstb, } eg.tables = append(eg.tables, et) } @@ -239,23 +254,25 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { } for i := range groups { g := groups[i] - g.tsdb.Tick(g.latestTS) for j := range g.tables { es := g.tables[j] - es.tsTable.Table().mustAddElements(&es.elements) + es.tsTable.mustAddElements(&es.elements) if len(es.docs) > 0 { - index := es.tsTable.Table().Index() + index := es.tsTable.Index() if err := index.Write(es.docs); err != nil { w.l.Error().Err(err).Msg("cannot write element index") } } - es.tsTable.DecRef() } if len(g.docs) > 0 { - if err := g.tsdb.IndexDB().Write(g.docs); err != nil { - w.l.Error().Err(err).Msg("cannot write series index") + for _, segment := range g.segments { + if err := segment.IndexDB().Write(g.docs); err != nil { + w.l.Error().Err(err).Msg("cannot write index") + } + segment.DecRef() } } + g.tsdb.Tick(g.latestTS) } return }