This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch idx-gc in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 016c3f59fd9057a6bde3ef5e20d4ed31d6b1872d Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Thu Apr 4 03:32:08 2024 +0000 Add more test cases Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- banyand/internal/storage/index.go | 253 +++++++++++++++++++-------------- banyand/internal/storage/index_test.go | 86 ++++++++++- banyand/internal/storage/retention.go | 12 +- banyand/internal/storage/segment.go | 16 +-- banyand/internal/storage/storage.go | 10 ++ pkg/fs/file_system.go | 7 +- pkg/fs/local_file_system.go | 17 +-- pkg/fs/local_file_system_test.go | 2 +- 8 files changed, 255 insertions(+), 148 deletions(-) diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index f19fc166..7bc91fea 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -20,8 +20,11 @@ package storage import ( "context" "fmt" - "os" + "path" "path/filepath" + "sort" + "strconv" + "strings" "sync" "time" @@ -47,19 +50,21 @@ func (d *database[T, O]) Lookup(ctx context.Context, series *pbv1.Series) (pbv1. } type seriesIndex struct { - l *logger.Logger - store index.SeriesStore - name string + startTime time.Time + store index.SeriesStore + l *logger.Logger + path string } -func newSeriesIndex(ctx context.Context, path, name string, flushTimeoutSeconds int64) (*seriesIndex, error) { +func newSeriesIndex(ctx context.Context, path string, startTime time.Time, flushTimeoutSeconds int64) (*seriesIndex, error) { si := &seriesIndex{ - name: name, - l: logger.Fetch(ctx, "series_index"), + path: path, + startTime: startTime, + l: logger.Fetch(ctx, "series_index"), } var err error if si.store, err = inverted.NewStore(inverted.StoreOpts{ - Path: filepath.Join(path, name), + Path: path, Logger: si.l, BatchWaitSec: flushTimeoutSeconds, }); err != nil { @@ -233,39 +238,67 @@ func (s *seriesIndex) Close() error { } type seriesIndexController[T TSTable, O any] struct { - ctx context.Context - hot *seriesIndex - standby *seriesIndex - timestamp.TimeRange - l *logger.Logger - opts TSDBOpts[T, O] + clock timestamp.Clock + hot *seriesIndex + standby *seriesIndex + l *logger.Logger + location string + opts TSDBOpts[T, O] + standbyLiveTime time.Duration sync.RWMutex } -func standard(t time.Time, unit IntervalUnit) time.Time { - switch unit { - case HOUR: - return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location()) - case DAY: - return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) - } - panic("invalid interval unit") -} - func newSeriesIndexController[T TSTable, O any]( ctx context.Context, opts TSDBOpts[T, O], ) (*seriesIndexController[T, O], error) { - var hpath, spath string l := logger.Fetch(ctx, "seriesIndexController") - startTime := standard(time.Now(), opts.TTL.Unit) - endTime := startTime.Add(opts.TTL.estimatedDuration()) - timeRange := timestamp.NewSectionTimeRange(startTime, endTime) - location := filepath.Clean(opts.Location) + clock, ctx := timestamp.GetClock(ctx) + var standbyLiveTime time.Duration + switch opts.TTL.Unit { + case HOUR: + standbyLiveTime = time.Hour + case DAY: + standbyLiveTime = 24 * time.Hour + default: + } + sic := &seriesIndexController[T, O]{ + opts: opts, + clock: clock, + standbyLiveTime: standbyLiveTime, + location: filepath.Clean(opts.Location), + l: l, + } + idxName, err := sic.loadIdx() + if err != nil { + return nil, err + } + switch len(idxName) { + case 0: + if sic.hot, err = sic.newIdx(ctx); err != nil { + return nil, err + } + case 1: + if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil { + return nil, err + } + case 2: + if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil { + return nil, err + } + if sic.standby, err = sic.openIdx(ctx, idxName[1]); err != nil { + return nil, err + } + default: + return nil, errors.New("unexpected series index count") + } + return sic, nil +} +func (sic *seriesIndexController[T, O]) loadIdx() ([]string, error) { idxName := make([]string, 0) if err := walkDir( - location, + sic.location, "idx", func(suffix string) error { idxName = append(idxName, "idx-"+suffix) @@ -273,120 +306,122 @@ func newSeriesIndexController[T TSTable, O any]( }); err != nil { return nil, err } - if len(idxName) != 0 { - hpath = idxName[0] - } else { - hpath = fmt.Sprintf("idx-%016x", time.Now().UnixNano()) - } - h, err := newSeriesIndex(ctx, location, hpath, opts.SeriesIndexFlushTimeoutSeconds) - if err != nil { - return nil, err - } - sir := &seriesIndexController[T, O]{ - hot: h, - ctx: ctx, - opts: opts, - TimeRange: timeRange, - l: l, + sort.StringSlice(idxName).Sort() + if len(idxName) > 2 { + redundantIdx := idxName[:len(idxName)-2] + for i := range redundantIdx { + lfs.MustRMAll(filepath.Join(sic.location, redundantIdx[i])) + } + idxName = idxName[len(idxName)-2:] } - if len(idxName) == 2 { - spath = idxName[1] - sb, err := newSeriesIndex(ctx, location, spath, opts.SeriesIndexFlushTimeoutSeconds) + return idxName, nil +} + +func (sic *seriesIndexController[T, O]) newIdx(ctx context.Context) (*seriesIndex, error) { + return sic.openIdx(ctx, fmt.Sprintf("idx-%016x", time.Now().UnixNano())) +} + +func (sic *seriesIndexController[T, O]) openIdx(ctx context.Context, name string) (*seriesIndex, error) { + p := path.Join(sic.location, name) + if ts, ok := strings.CutPrefix(name, "idx-"); ok { + t, err := strconv.ParseInt(ts, 16, 64) if err != nil { return nil, err } - sir.standby = sb + + return newSeriesIndex(ctx, p, sic.opts.TTL.Unit.standard(time.Unix(0, t)), sic.opts.SeriesIndexFlushTimeoutSeconds) } - return sir, nil + return nil, errors.New("unexpected series index name") } -func (sir *seriesIndexController[T, O]) run(deadline time.Time) (err error) { - sir.l.Info().Time("deadline", deadline).Msg("start to swap series index") - if sir.End.Before(deadline) { - sir.Lock() - defer sir.Unlock() - - sir.hot, sir.standby = sir.standby, sir.hot - go func() { - <-time.After(time.Hour) - sir.Lock() - defer sir.Unlock() - err = sir.standby.Close() - if err != nil { - sir.l.Error().Msg("fail to close standby series index") - } - location := filepath.Clean(sir.opts.Location) - root := filepath.Join(location, sir.standby.name) - err = os.RemoveAll(root) +func (sic *seriesIndexController[T, O]) run(deadline time.Time) (err error) { + var standby *seriesIndex + ctx := context.WithValue(context.Background(), logger.ContextKey, sic.l) + _, err = sic.loadIdx() + if err != nil { + sic.l.Warn().Err(err).Msg("fail to clear redundant series index") + } + if sic.hot.startTime.Before(deadline) { + sic.l.Info().Time("deadline", deadline).Msg("start to swap series index") + sic.Lock() + if sic.standby == nil { + sic.standby, err = sic.newIdx(ctx) if err != nil { - sir.l.Error().Msg("fail to remove expired standby directory") + sic.Unlock() + return err } - sir.standby = nil - }() - - startTime := standard(time.Now(), sir.opts.TTL.Unit) - endTime := startTime.Add(sir.opts.TTL.estimatedDuration()) - sir.TimeRange = timestamp.NewSectionTimeRange(startTime, endTime) + } + standby = sic.hot + sic.hot = sic.standby + sic.standby = nil + sic.Unlock() + err = standby.Close() + if err != nil { + sic.l.Warn().Err(err).Msg("fail to close standby series index") + } + lfs.MustRMAll(standby.path) + sic.l.Info().Str("path", standby.path).Msg("dropped series index") + lfs.SyncPath(sic.location) } - if sir.End.Sub(deadline) < time.Hour { - location := filepath.Clean(sir.opts.Location) - path := fmt.Sprintf("idx-%016x", time.Now().UnixNano()) - sir.standby, err = newSeriesIndex(sir.ctx, location, path, sir.opts.SeriesIndexFlushTimeoutSeconds) + + liveTime := sic.hot.startTime.Sub(deadline) + if liveTime > 0 && liveTime < sic.standbyLiveTime { + sic.l.Info().Time("deadline", deadline).Msg("start to create standby series index") + standby, err = sic.newIdx(ctx) if err != nil { return err } + sic.Lock() + sic.standby = standby + sic.Unlock() } return nil } -func (sir *seriesIndexController[T, O]) Write(docs index.Documents) error { - sir.Lock() - defer sir.Unlock() - if sir.standby != nil { - err := sir.standby.Write(docs) - if err != nil { - sir.l.Error().Msg("fail to write docs in standby series index") - } +func (sic *seriesIndexController[T, O]) Write(docs index.Documents) error { + sic.RLock() + defer sic.RUnlock() + if sic.standby != nil { + return sic.standby.Write(docs) } - return sir.hot.Write(docs) + return sic.hot.Write(docs) } -func (sir *seriesIndexController[T, O]) searchPrimary(ctx context.Context, series *pbv1.Series) (pbv1.SeriesList, error) { - sir.RLock() - defer sir.RUnlock() +func (sic *seriesIndexController[T, O]) searchPrimary(ctx context.Context, series *pbv1.Series) (pbv1.SeriesList, error) { + sic.RLock() + defer sic.RUnlock() - sl, err := sir.hot.searchPrimary(ctx, series) + sl, err := sic.hot.searchPrimary(ctx, series) if err != nil { return nil, err } - if len(sl) > 0 || sir.standby == nil { + if len(sl) > 0 || sic.standby == nil { return sl, nil } - return sir.standby.searchPrimary(ctx, series) + return sic.standby.searchPrimary(ctx, series) } -func (sir *seriesIndexController[T, O]) Search(ctx context.Context, series *pbv1.Series, filter index.Filter, order *pbv1.OrderBy) (pbv1.SeriesList, error) { - sir.RLock() - defer sir.RUnlock() +func (sic *seriesIndexController[T, O]) Search(ctx context.Context, series *pbv1.Series, + filter index.Filter, order *pbv1.OrderBy, preloadSize int, +) (pbv1.SeriesList, error) { + sic.RLock() + defer sic.RUnlock() - sl, err := sir.hot.Search(ctx, series, filter, order) + sl, err := sic.hot.Search(ctx, series, filter, order, preloadSize) if err != nil { return nil, err } - if len(sl) > 0 || sir.standby == nil { + if len(sl) > 0 || sic.standby == nil { return sl, nil } - return sir.standby.Search(ctx, series, filter, order) + return sic.standby.Search(ctx, series, filter, order, preloadSize) } -func (sir *seriesIndexController[T, O]) Close() error { - sir.Lock() - defer sir.Unlock() - if sir.standby != nil { - err := sir.standby.Close() - if err != nil { - return err - } +func (sic *seriesIndexController[T, O]) Close() error { + sic.Lock() + defer sic.Unlock() + if sic.standby != nil { + return multierr.Combine(sic.hot.Close(), sic.standby.Close()) } - return sir.hot.Close() + return sic.hot.Close() } diff --git a/banyand/internal/storage/index_test.go b/banyand/internal/storage/index_test.go index 066f3b00..a7adc370 100644 --- a/banyand/internal/storage/index_test.go +++ b/banyand/internal/storage/index_test.go @@ -20,7 +20,10 @@ package storage import ( "context" "fmt" + "os" + "path" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -38,7 +41,7 @@ var testSeriesPool pbv1.SeriesPool func TestSeriesIndex_Primary(t *testing.T) { ctx := context.Background() path, fn := setUp(require.New(t)) - si, err := newSeriesIndex(ctx, path, "idx", 0) + si, err := newSeriesIndex(ctx, path, time.Now(), 0) require.NoError(t, err) defer func() { require.NoError(t, si.Close()) @@ -69,7 +72,7 @@ func TestSeriesIndex_Primary(t *testing.T) { require.NoError(t, si.Write(docs)) // Restart the index require.NoError(t, si.Close()) - si, err = newSeriesIndex(ctx, path, "idx", 0) + si, err = newSeriesIndex(ctx, path, time.Now(), 0) require.NoError(t, err) tests := []struct { name string @@ -140,3 +143,82 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func()) { tempDir, deferFunc = test.Space(t) return tempDir, deferFunc } + +func TestSeriesIndexController(t *testing.T) { + ttl := IntervalRule{ + Unit: DAY, + Num: 3, + } + t.Run("Test setup", func(t *testing.T) { + ctx := context.Background() + tmpDir, dfFn, err := test.NewSpace() + require.NoError(t, err) + defer dfFn() + + opts := TSDBOpts[TSTable, any]{ + Location: tmpDir, + TTL: ttl, + } + + sic, err := newSeriesIndexController(ctx, opts) + assert.NoError(t, err) + assert.NotNil(t, sic) + idxNames := make([]string, 0) + walkDir(tmpDir, "idx-", func(suffix string) error { + idxNames = append(idxNames, suffix) + return nil + }) + assert.Equal(t, 1, len(idxNames)) + require.NoError(t, sic.Close()) + sic, err = newSeriesIndexController(ctx, opts) + assert.NoError(t, err) + assert.NotNil(t, sic) + idxNames = idxNames[:0] + walkDir(tmpDir, "idx-", func(suffix string) error { + idxNames = append(idxNames, suffix) + return nil + }) + assert.Equal(t, 1, len(idxNames)) + require.NoError(t, sic.Close()) + + require.NoError(t, os.MkdirAll(path.Join(tmpDir, fmt.Sprintf("idx-%016x", time.Now().UnixNano()-20000)), 0o755)) + require.NoError(t, os.MkdirAll(path.Join(tmpDir, fmt.Sprintf("idx-%016x", time.Now().UnixNano()-10000)), 0o755)) + sic, err = newSeriesIndexController(ctx, opts) + assert.NoError(t, err) + assert.NotNil(t, sic) + idxNames = idxNames[:0] + walkDir(tmpDir, "idx-", func(suffix string) error { + idxNames = append(idxNames, suffix) + return nil + }) + assert.Equal(t, 2, len(idxNames)) + require.NoError(t, sic.Close()) + }) + + t.Run("Test retention", func(t *testing.T) { + ctx := context.Background() + tmpDir, dfFn, err := test.NewSpace() + require.NoError(t, err) + defer dfFn() + + opts := TSDBOpts[TSTable, any]{ + Location: tmpDir, + TTL: ttl, + } + sic, err := newSeriesIndexController(ctx, opts) + require.NoError(t, err) + defer sic.Close() + require.NoError(t, sic.run(time.Now().Add(-time.Hour*23))) + assert.NotNil(t, sic.standby) + idxNames := make([]string, 0) + walkDir(tmpDir, "idx-", func(suffix string) error { + idxNames = append(idxNames, suffix) + return nil + }) + assert.Equal(t, 2, len(idxNames)) + nextTime := sic.standby.startTime + require.NoError(t, sic.run(time.Now().Add(time.Hour))) + assert.Nil(t, sic.standby) + assert.Equal(t, nextTime, sic.hot.startTime) + }) +} diff --git a/banyand/internal/storage/retention.go b/banyand/internal/storage/retention.go index af9cef05..445b7f56 100644 --- a/banyand/internal/storage/retention.go +++ b/banyand/internal/storage/retention.go @@ -51,12 +51,18 @@ func newRetentionTask[T TSTable, O any](database *database[T, O], ttl IntervalRu } func (rc *retentionTask[T, O]) run(now time.Time, l *logger.Logger) bool { - for _, shard := range rc.database.sLst { - if err := shard.segmentController.remove(now.Add(-rc.duration)); err != nil { + var shardList []*shard[T, O] + rc.database.RLock() + shardList = append(shardList, rc.database.sLst...) + rc.database.RUnlock() + deadline := now.Add(-rc.duration) + + for _, shard := range shardList { + if err := shard.segmentController.remove(deadline); err != nil { l.Error().Err(err) } } - if err := rc.database.indexController.run(now.Add(-rc.duration)); err != nil { + if err := rc.database.indexController.run(deadline); err != nil { l.Error().Err(err) } return true diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 2436a6f4..22890ddc 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -71,7 +71,7 @@ func openSegment[T TSTable](ctx context.Context, startTime, endTime time.Time, p l := logger.Fetch(ctx, s.String()) s.l = l clock, _ := timestamp.GetClock(ctx) - s.Reporter = bucket.NewTimeBasedReporter(fmt.Sprintf("Shard-%s-%s", p.Shard, s.String()), timeRange, clock, scheduler) + s.Reporter = bucket.NewTimeBasedReporter(fmt.Sprintf("%s-%s", p.Shard, s.String()), timeRange, clock, scheduler) return s, nil } @@ -181,7 +181,7 @@ func (sc *segmentController[T, O]) segments() (ss []*segment[T]) { } func (sc *segmentController[T, O]) Current() (bucket.Reporter, error) { - now := sc.Standard(sc.clock.Now()) + now := sc.segmentSize.Unit.standard(sc.clock.Now()) ns := uint64(now.UnixNano()) if b := func() bucket.Reporter { sc.RLock() @@ -222,16 +222,6 @@ func (sc *segmentController[T, O]) OnMove(prev bucket.Reporter, next bucket.Repo event.Msg("move to the next segment") } -func (sc *segmentController[T, O]) Standard(t time.Time) time.Time { - switch sc.segmentSize.Unit { - case HOUR: - return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location()) - case DAY: - return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) - } - panic("invalid interval unit") -} - func (sc *segmentController[T, O]) Format(tm time.Time) string { switch sc.segmentSize.Unit { case HOUR: @@ -282,7 +272,7 @@ func (sc *segmentController[T, O]) open() error { func (sc *segmentController[T, O]) create(start time.Time) (*segment[T], error) { sc.Lock() defer sc.Unlock() - start = sc.Standard(start) + start = sc.segmentSize.Unit.standard(start) var next *segment[T] for _, s := range sc.lst { if s.Contains(uint64(start.UnixNano())) { diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index bce8c8b7..79431224 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -114,6 +114,16 @@ func (iu IntervalUnit) String() string { panic("invalid interval unit") } +func (iu IntervalUnit) standard(t time.Time) time.Time { + switch iu { + case HOUR: + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location()) + case DAY: + return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) + } + panic("invalid interval unit") +} + // IntervalRule defines a length of two points in time. type IntervalRule struct { Unit IntervalUnit diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go index 09fe1de1..a2ed0950 100644 --- a/pkg/fs/file_system.go +++ b/pkg/fs/file_system.go @@ -20,7 +20,6 @@ package fs import ( "io" - "os" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -88,8 +87,6 @@ type File interface { Size() (int64, error) // Returns the absolute path of the file. Path() string - // Clear file content - Clear() error // Close File. Close() error } @@ -105,7 +102,7 @@ type FileSystem interface { // ReadDir reads the directory named by dirname and returns a list of directory entries sorted by filename. ReadDir(dirname string) []DirEntry // Create and open the file by specified name and mode. - CreateFile(name string, flag int, permission Mode) (File, error) + CreateFile(name string, permission Mode) (File, error) // Create and open lock file by specified name and mode. CreateLockFile(name string, permission Mode) (File, error) // Open the file by specified name and mode. @@ -135,7 +132,7 @@ type DirEntry interface { // MustCreateFile creates a new file with the specified name and permission. func MustCreateFile(fs FileSystem, path string, permission Mode) File { - f, err := fs.CreateFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, permission) + f, err := fs.CreateFile(path, permission) if err != nil { logger.GetLogger().Panic().Err(err).Str("path", path).Msg("cannot create file") } diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go index 3a59ae24..b943a6f1 100644 --- a/pkg/fs/local_file_system.go +++ b/pkg/fs/local_file_system.go @@ -126,8 +126,8 @@ func (fs *localFileSystem) ReadDir(dirname string) []DirEntry { } // CreateFile is used to create and open the file by specified name and mode. -func (fs *localFileSystem) CreateFile(name string, flag int, permission Mode) (File, error) { - file, err := os.OpenFile(name, flag, os.FileMode(permission)) +func (fs *localFileSystem) CreateFile(name string, permission Mode) (File, error) { + file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission)) switch { case err == nil: return &LocalFile{ @@ -417,19 +417,6 @@ func (file *LocalFile) Close() error { return nil } -// Clear file content. -func (file *LocalFile) Clear() error { - err := file.file.Truncate(0) - if err != nil { - return err - } - _, err = file.file.Seek(0, 0) - if err != nil { - return err - } - return nil -} - type seqReader struct { reader *bufio.Reader fileName string diff --git a/pkg/fs/local_file_system_test.go b/pkg/fs/local_file_system_test.go index 77858d06..eaf278a5 100644 --- a/pkg/fs/local_file_system_test.go +++ b/pkg/fs/local_file_system_test.go @@ -46,7 +46,7 @@ var _ = ginkgo.Describe("Loacl File System", func() { fs = NewLocalFileSystem() err := os.MkdirAll(dirName, 0o777) gomega.Expect(err).ToNot(gomega.HaveOccurred()) - file, err = fs.CreateFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o777) + file, err = fs.CreateFile(fileName, 0o777) gomega.Expect(err).ToNot(gomega.HaveOccurred()) _, err = os.Stat(fileName) gomega.Expect(err).ToNot(gomega.HaveOccurred())