This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch idx-gc
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 016c3f59fd9057a6bde3ef5e20d4ed31d6b1872d
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Thu Apr 4 03:32:08 2024 +0000

    Add more test cases
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 banyand/internal/storage/index.go      | 253 +++++++++++++++++++--------------
 banyand/internal/storage/index_test.go |  86 ++++++++++-
 banyand/internal/storage/retention.go  |  12 +-
 banyand/internal/storage/segment.go    |  16 +--
 banyand/internal/storage/storage.go    |  10 ++
 pkg/fs/file_system.go                  |   7 +-
 pkg/fs/local_file_system.go            |  17 +--
 pkg/fs/local_file_system_test.go       |   2 +-
 8 files changed, 255 insertions(+), 148 deletions(-)

diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index f19fc166..7bc91fea 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -20,8 +20,11 @@ package storage
 import (
        "context"
        "fmt"
-       "os"
+       "path"
        "path/filepath"
+       "sort"
+       "strconv"
+       "strings"
        "sync"
        "time"
 
@@ -47,19 +50,21 @@ func (d *database[T, O]) Lookup(ctx context.Context, series 
*pbv1.Series) (pbv1.
 }
 
 type seriesIndex struct {
-       l     *logger.Logger
-       store index.SeriesStore
-       name  string
+       startTime time.Time
+       store     index.SeriesStore
+       l         *logger.Logger
+       path      string
 }
 
-func newSeriesIndex(ctx context.Context, path, name string, 
flushTimeoutSeconds int64) (*seriesIndex, error) {
+func newSeriesIndex(ctx context.Context, path string, startTime time.Time, 
flushTimeoutSeconds int64) (*seriesIndex, error) {
        si := &seriesIndex{
-               name: name,
-               l:    logger.Fetch(ctx, "series_index"),
+               path:      path,
+               startTime: startTime,
+               l:         logger.Fetch(ctx, "series_index"),
        }
        var err error
        if si.store, err = inverted.NewStore(inverted.StoreOpts{
-               Path:         filepath.Join(path, name),
+               Path:         path,
                Logger:       si.l,
                BatchWaitSec: flushTimeoutSeconds,
        }); err != nil {
@@ -233,39 +238,67 @@ func (s *seriesIndex) Close() error {
 }
 
 type seriesIndexController[T TSTable, O any] struct {
-       ctx     context.Context
-       hot     *seriesIndex
-       standby *seriesIndex
-       timestamp.TimeRange
-       l    *logger.Logger
-       opts TSDBOpts[T, O]
+       clock           timestamp.Clock
+       hot             *seriesIndex
+       standby         *seriesIndex
+       l               *logger.Logger
+       location        string
+       opts            TSDBOpts[T, O]
+       standbyLiveTime time.Duration
        sync.RWMutex
 }
 
-func standard(t time.Time, unit IntervalUnit) time.Time {
-       switch unit {
-       case HOUR:
-               return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 
0, t.Location())
-       case DAY:
-               return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, 
t.Location())
-       }
-       panic("invalid interval unit")
-}
-
 func newSeriesIndexController[T TSTable, O any](
        ctx context.Context,
        opts TSDBOpts[T, O],
 ) (*seriesIndexController[T, O], error) {
-       var hpath, spath string
        l := logger.Fetch(ctx, "seriesIndexController")
-       startTime := standard(time.Now(), opts.TTL.Unit)
-       endTime := startTime.Add(opts.TTL.estimatedDuration())
-       timeRange := timestamp.NewSectionTimeRange(startTime, endTime)
-       location := filepath.Clean(opts.Location)
+       clock, ctx := timestamp.GetClock(ctx)
+       var standbyLiveTime time.Duration
+       switch opts.TTL.Unit {
+       case HOUR:
+               standbyLiveTime = time.Hour
+       case DAY:
+               standbyLiveTime = 24 * time.Hour
+       default:
+       }
+       sic := &seriesIndexController[T, O]{
+               opts:            opts,
+               clock:           clock,
+               standbyLiveTime: standbyLiveTime,
+               location:        filepath.Clean(opts.Location),
+               l:               l,
+       }
+       idxName, err := sic.loadIdx()
+       if err != nil {
+               return nil, err
+       }
+       switch len(idxName) {
+       case 0:
+               if sic.hot, err = sic.newIdx(ctx); err != nil {
+                       return nil, err
+               }
+       case 1:
+               if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil {
+                       return nil, err
+               }
+       case 2:
+               if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil {
+                       return nil, err
+               }
+               if sic.standby, err = sic.openIdx(ctx, idxName[1]); err != nil {
+                       return nil, err
+               }
+       default:
+               return nil, errors.New("unexpected series index count")
+       }
+       return sic, nil
+}
 
+func (sic *seriesIndexController[T, O]) loadIdx() ([]string, error) {
        idxName := make([]string, 0)
        if err := walkDir(
-               location,
+               sic.location,
                "idx",
                func(suffix string) error {
                        idxName = append(idxName, "idx-"+suffix)
@@ -273,120 +306,122 @@ func newSeriesIndexController[T TSTable, O any](
                }); err != nil {
                return nil, err
        }
-       if len(idxName) != 0 {
-               hpath = idxName[0]
-       } else {
-               hpath = fmt.Sprintf("idx-%016x", time.Now().UnixNano())
-       }
-       h, err := newSeriesIndex(ctx, location, hpath, 
opts.SeriesIndexFlushTimeoutSeconds)
-       if err != nil {
-               return nil, err
-       }
-       sir := &seriesIndexController[T, O]{
-               hot:       h,
-               ctx:       ctx,
-               opts:      opts,
-               TimeRange: timeRange,
-               l:         l,
+       sort.StringSlice(idxName).Sort()
+       if len(idxName) > 2 {
+               redundantIdx := idxName[:len(idxName)-2]
+               for i := range redundantIdx {
+                       lfs.MustRMAll(filepath.Join(sic.location, 
redundantIdx[i]))
+               }
+               idxName = idxName[len(idxName)-2:]
        }
-       if len(idxName) == 2 {
-               spath = idxName[1]
-               sb, err := newSeriesIndex(ctx, location, spath, 
opts.SeriesIndexFlushTimeoutSeconds)
+       return idxName, nil
+}
+
+func (sic *seriesIndexController[T, O]) newIdx(ctx context.Context) 
(*seriesIndex, error) {
+       return sic.openIdx(ctx, fmt.Sprintf("idx-%016x", time.Now().UnixNano()))
+}
+
+func (sic *seriesIndexController[T, O]) openIdx(ctx context.Context, name 
string) (*seriesIndex, error) {
+       p := path.Join(sic.location, name)
+       if ts, ok := strings.CutPrefix(name, "idx-"); ok {
+               t, err := strconv.ParseInt(ts, 16, 64)
                if err != nil {
                        return nil, err
                }
-               sir.standby = sb
+
+               return newSeriesIndex(ctx, p, 
sic.opts.TTL.Unit.standard(time.Unix(0, t)), 
sic.opts.SeriesIndexFlushTimeoutSeconds)
        }
-       return sir, nil
+       return nil, errors.New("unexpected series index name")
 }
 
-func (sir *seriesIndexController[T, O]) run(deadline time.Time) (err error) {
-       sir.l.Info().Time("deadline", deadline).Msg("start to swap series 
index")
-       if sir.End.Before(deadline) {
-               sir.Lock()
-               defer sir.Unlock()
-
-               sir.hot, sir.standby = sir.standby, sir.hot
-               go func() {
-                       <-time.After(time.Hour)
-                       sir.Lock()
-                       defer sir.Unlock()
-                       err = sir.standby.Close()
-                       if err != nil {
-                               sir.l.Error().Msg("fail to close standby series 
index")
-                       }
-                       location := filepath.Clean(sir.opts.Location)
-                       root := filepath.Join(location, sir.standby.name)
-                       err = os.RemoveAll(root)
+func (sic *seriesIndexController[T, O]) run(deadline time.Time) (err error) {
+       var standby *seriesIndex
+       ctx := context.WithValue(context.Background(), logger.ContextKey, sic.l)
+       _, err = sic.loadIdx()
+       if err != nil {
+               sic.l.Warn().Err(err).Msg("fail to clear redundant series 
index")
+       }
+       if sic.hot.startTime.Before(deadline) {
+               sic.l.Info().Time("deadline", deadline).Msg("start to swap 
series index")
+               sic.Lock()
+               if sic.standby == nil {
+                       sic.standby, err = sic.newIdx(ctx)
                        if err != nil {
-                               sir.l.Error().Msg("fail to remove expired 
standby directory")
+                               sic.Unlock()
+                               return err
                        }
-                       sir.standby = nil
-               }()
-
-               startTime := standard(time.Now(), sir.opts.TTL.Unit)
-               endTime := startTime.Add(sir.opts.TTL.estimatedDuration())
-               sir.TimeRange = timestamp.NewSectionTimeRange(startTime, 
endTime)
+               }
+               standby = sic.hot
+               sic.hot = sic.standby
+               sic.standby = nil
+               sic.Unlock()
+               err = standby.Close()
+               if err != nil {
+                       sic.l.Warn().Err(err).Msg("fail to close standby series 
index")
+               }
+               lfs.MustRMAll(standby.path)
+               sic.l.Info().Str("path", standby.path).Msg("dropped series 
index")
+               lfs.SyncPath(sic.location)
        }
-       if sir.End.Sub(deadline) < time.Hour {
-               location := filepath.Clean(sir.opts.Location)
-               path := fmt.Sprintf("idx-%016x", time.Now().UnixNano())
-               sir.standby, err = newSeriesIndex(sir.ctx, location, path, 
sir.opts.SeriesIndexFlushTimeoutSeconds)
+
+       liveTime := sic.hot.startTime.Sub(deadline)
+       if liveTime > 0 && liveTime < sic.standbyLiveTime {
+               sic.l.Info().Time("deadline", deadline).Msg("start to create 
standby series index")
+               standby, err = sic.newIdx(ctx)
                if err != nil {
                        return err
                }
+               sic.Lock()
+               sic.standby = standby
+               sic.Unlock()
        }
        return nil
 }
 
-func (sir *seriesIndexController[T, O]) Write(docs index.Documents) error {
-       sir.Lock()
-       defer sir.Unlock()
-       if sir.standby != nil {
-               err := sir.standby.Write(docs)
-               if err != nil {
-                       sir.l.Error().Msg("fail to write docs in standby series 
index")
-               }
+func (sic *seriesIndexController[T, O]) Write(docs index.Documents) error {
+       sic.RLock()
+       defer sic.RUnlock()
+       if sic.standby != nil {
+               return sic.standby.Write(docs)
        }
-       return sir.hot.Write(docs)
+       return sic.hot.Write(docs)
 }
 
-func (sir *seriesIndexController[T, O]) searchPrimary(ctx context.Context, 
series *pbv1.Series) (pbv1.SeriesList, error) {
-       sir.RLock()
-       defer sir.RUnlock()
+func (sic *seriesIndexController[T, O]) searchPrimary(ctx context.Context, 
series *pbv1.Series) (pbv1.SeriesList, error) {
+       sic.RLock()
+       defer sic.RUnlock()
 
-       sl, err := sir.hot.searchPrimary(ctx, series)
+       sl, err := sic.hot.searchPrimary(ctx, series)
        if err != nil {
                return nil, err
        }
-       if len(sl) > 0 || sir.standby == nil {
+       if len(sl) > 0 || sic.standby == nil {
                return sl, nil
        }
-       return sir.standby.searchPrimary(ctx, series)
+       return sic.standby.searchPrimary(ctx, series)
 }
 
-func (sir *seriesIndexController[T, O]) Search(ctx context.Context, series 
*pbv1.Series, filter index.Filter, order *pbv1.OrderBy) (pbv1.SeriesList, 
error) {
-       sir.RLock()
-       defer sir.RUnlock()
+func (sic *seriesIndexController[T, O]) Search(ctx context.Context, series 
*pbv1.Series,
+       filter index.Filter, order *pbv1.OrderBy, preloadSize int,
+) (pbv1.SeriesList, error) {
+       sic.RLock()
+       defer sic.RUnlock()
 
-       sl, err := sir.hot.Search(ctx, series, filter, order)
+       sl, err := sic.hot.Search(ctx, series, filter, order, preloadSize)
        if err != nil {
                return nil, err
        }
-       if len(sl) > 0 || sir.standby == nil {
+       if len(sl) > 0 || sic.standby == nil {
                return sl, nil
        }
-       return sir.standby.Search(ctx, series, filter, order)
+       return sic.standby.Search(ctx, series, filter, order, preloadSize)
 }
 
-func (sir *seriesIndexController[T, O]) Close() error {
-       sir.Lock()
-       defer sir.Unlock()
-       if sir.standby != nil {
-               err := sir.standby.Close()
-               if err != nil {
-                       return err
-               }
+func (sic *seriesIndexController[T, O]) Close() error {
+       sic.Lock()
+       defer sic.Unlock()
+       if sic.standby != nil {
+               return multierr.Combine(sic.hot.Close(), sic.standby.Close())
        }
-       return sir.hot.Close()
+       return sic.hot.Close()
 }
diff --git a/banyand/internal/storage/index_test.go 
b/banyand/internal/storage/index_test.go
index 066f3b00..a7adc370 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -20,7 +20,10 @@ package storage
 import (
        "context"
        "fmt"
+       "os"
+       "path"
        "testing"
+       "time"
 
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
@@ -38,7 +41,7 @@ var testSeriesPool pbv1.SeriesPool
 func TestSeriesIndex_Primary(t *testing.T) {
        ctx := context.Background()
        path, fn := setUp(require.New(t))
-       si, err := newSeriesIndex(ctx, path, "idx", 0)
+       si, err := newSeriesIndex(ctx, path, time.Now(), 0)
        require.NoError(t, err)
        defer func() {
                require.NoError(t, si.Close())
@@ -69,7 +72,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
        require.NoError(t, si.Write(docs))
        // Restart the index
        require.NoError(t, si.Close())
-       si, err = newSeriesIndex(ctx, path, "idx", 0)
+       si, err = newSeriesIndex(ctx, path, time.Now(), 0)
        require.NoError(t, err)
        tests := []struct {
                name         string
@@ -140,3 +143,82 @@ func setUp(t *require.Assertions) (tempDir string, 
deferFunc func()) {
        tempDir, deferFunc = test.Space(t)
        return tempDir, deferFunc
 }
+
+func TestSeriesIndexController(t *testing.T) {
+       ttl := IntervalRule{
+               Unit: DAY,
+               Num:  3,
+       }
+       t.Run("Test setup", func(t *testing.T) {
+               ctx := context.Background()
+               tmpDir, dfFn, err := test.NewSpace()
+               require.NoError(t, err)
+               defer dfFn()
+
+               opts := TSDBOpts[TSTable, any]{
+                       Location: tmpDir,
+                       TTL:      ttl,
+               }
+
+               sic, err := newSeriesIndexController(ctx, opts)
+               assert.NoError(t, err)
+               assert.NotNil(t, sic)
+               idxNames := make([]string, 0)
+               walkDir(tmpDir, "idx-", func(suffix string) error {
+                       idxNames = append(idxNames, suffix)
+                       return nil
+               })
+               assert.Equal(t, 1, len(idxNames))
+               require.NoError(t, sic.Close())
+               sic, err = newSeriesIndexController(ctx, opts)
+               assert.NoError(t, err)
+               assert.NotNil(t, sic)
+               idxNames = idxNames[:0]
+               walkDir(tmpDir, "idx-", func(suffix string) error {
+                       idxNames = append(idxNames, suffix)
+                       return nil
+               })
+               assert.Equal(t, 1, len(idxNames))
+               require.NoError(t, sic.Close())
+
+               require.NoError(t, os.MkdirAll(path.Join(tmpDir, 
fmt.Sprintf("idx-%016x", time.Now().UnixNano()-20000)), 0o755))
+               require.NoError(t, os.MkdirAll(path.Join(tmpDir, 
fmt.Sprintf("idx-%016x", time.Now().UnixNano()-10000)), 0o755))
+               sic, err = newSeriesIndexController(ctx, opts)
+               assert.NoError(t, err)
+               assert.NotNil(t, sic)
+               idxNames = idxNames[:0]
+               walkDir(tmpDir, "idx-", func(suffix string) error {
+                       idxNames = append(idxNames, suffix)
+                       return nil
+               })
+               assert.Equal(t, 2, len(idxNames))
+               require.NoError(t, sic.Close())
+       })
+
+       t.Run("Test retention", func(t *testing.T) {
+               ctx := context.Background()
+               tmpDir, dfFn, err := test.NewSpace()
+               require.NoError(t, err)
+               defer dfFn()
+
+               opts := TSDBOpts[TSTable, any]{
+                       Location: tmpDir,
+                       TTL:      ttl,
+               }
+               sic, err := newSeriesIndexController(ctx, opts)
+               require.NoError(t, err)
+               defer sic.Close()
+               require.NoError(t, sic.run(time.Now().Add(-time.Hour*23)))
+               assert.NotNil(t, sic.standby)
+               idxNames := make([]string, 0)
+               walkDir(tmpDir, "idx-", func(suffix string) error {
+                       idxNames = append(idxNames, suffix)
+                       return nil
+               })
+               assert.Equal(t, 2, len(idxNames))
+               nextTime := sic.standby.startTime
+               require.NoError(t, sic.run(time.Now().Add(time.Hour)))
+               assert.Nil(t, sic.standby)
+               assert.Equal(t, nextTime, sic.hot.startTime)
+       })
+}
diff --git a/banyand/internal/storage/retention.go 
b/banyand/internal/storage/retention.go
index af9cef05..445b7f56 100644
--- a/banyand/internal/storage/retention.go
+++ b/banyand/internal/storage/retention.go
@@ -51,12 +51,18 @@ func newRetentionTask[T TSTable, O any](database 
*database[T, O], ttl IntervalRu
 }
 
 func (rc *retentionTask[T, O]) run(now time.Time, l *logger.Logger) bool {
-       for _, shard := range rc.database.sLst {
-               if err := 
shard.segmentController.remove(now.Add(-rc.duration)); err != nil {
+       var shardList []*shard[T, O]
+       rc.database.RLock()
+       shardList = append(shardList, rc.database.sLst...)
+       rc.database.RUnlock()
+       deadline := now.Add(-rc.duration)
+
+       for _, shard := range shardList {
+               if err := shard.segmentController.remove(deadline); err != nil {
                        l.Error().Err(err)
                }
        }
-       if err := rc.database.indexController.run(now.Add(-rc.duration)); err 
!= nil {
+       if err := rc.database.indexController.run(deadline); err != nil {
                l.Error().Err(err)
        }
        return true
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 2436a6f4..22890ddc 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -71,7 +71,7 @@ func openSegment[T TSTable](ctx context.Context, startTime, 
endTime time.Time, p
        l := logger.Fetch(ctx, s.String())
        s.l = l
        clock, _ := timestamp.GetClock(ctx)
-       s.Reporter = bucket.NewTimeBasedReporter(fmt.Sprintf("Shard-%s-%s", 
p.Shard, s.String()), timeRange, clock, scheduler)
+       s.Reporter = bucket.NewTimeBasedReporter(fmt.Sprintf("%s-%s", p.Shard, 
s.String()), timeRange, clock, scheduler)
        return s, nil
 }
 
@@ -181,7 +181,7 @@ func (sc *segmentController[T, O]) segments() (ss 
[]*segment[T]) {
 }
 
 func (sc *segmentController[T, O]) Current() (bucket.Reporter, error) {
-       now := sc.Standard(sc.clock.Now())
+       now := sc.segmentSize.Unit.standard(sc.clock.Now())
        ns := uint64(now.UnixNano())
        if b := func() bucket.Reporter {
                sc.RLock()
@@ -222,16 +222,6 @@ func (sc *segmentController[T, O]) OnMove(prev 
bucket.Reporter, next bucket.Repo
        event.Msg("move to the next segment")
 }
 
-func (sc *segmentController[T, O]) Standard(t time.Time) time.Time {
-       switch sc.segmentSize.Unit {
-       case HOUR:
-               return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 
0, t.Location())
-       case DAY:
-               return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, 
t.Location())
-       }
-       panic("invalid interval unit")
-}
-
 func (sc *segmentController[T, O]) Format(tm time.Time) string {
        switch sc.segmentSize.Unit {
        case HOUR:
@@ -282,7 +272,7 @@ func (sc *segmentController[T, O]) open() error {
 func (sc *segmentController[T, O]) create(start time.Time) (*segment[T], 
error) {
        sc.Lock()
        defer sc.Unlock()
-       start = sc.Standard(start)
+       start = sc.segmentSize.Unit.standard(start)
        var next *segment[T]
        for _, s := range sc.lst {
                if s.Contains(uint64(start.UnixNano())) {
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index bce8c8b7..79431224 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -114,6 +114,16 @@ func (iu IntervalUnit) String() string {
        panic("invalid interval unit")
 }
 
+func (iu IntervalUnit) standard(t time.Time) time.Time {
+       switch iu {
+       case HOUR:
+               return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 
0, t.Location())
+       case DAY:
+               return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, 
t.Location())
+       }
+       panic("invalid interval unit")
+}
+
 // IntervalRule defines a length of two points in time.
 type IntervalRule struct {
        Unit IntervalUnit
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 09fe1de1..a2ed0950 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -20,7 +20,6 @@ package fs
 
 import (
        "io"
-       "os"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
@@ -88,8 +87,6 @@ type File interface {
        Size() (int64, error)
        // Returns the absolute path of the file.
        Path() string
-       // Clear file content
-       Clear() error
        // Close File.
        Close() error
 }
@@ -105,7 +102,7 @@ type FileSystem interface {
        // ReadDir reads the directory named by dirname and returns a list of 
directory entries sorted by filename.
        ReadDir(dirname string) []DirEntry
        // Create and open the file by specified name and mode.
-       CreateFile(name string, flag int, permission Mode) (File, error)
+       CreateFile(name string, permission Mode) (File, error)
        // Create and open lock file by specified name and mode.
        CreateLockFile(name string, permission Mode) (File, error)
        // Open the file by specified name and mode.
@@ -135,7 +132,7 @@ type DirEntry interface {
 
 // MustCreateFile creates a new file with the specified name and permission.
 func MustCreateFile(fs FileSystem, path string, permission Mode) File {
-       f, err := fs.CreateFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 
permission)
+       f, err := fs.CreateFile(path, permission)
        if err != nil {
                logger.GetLogger().Panic().Err(err).Str("path", 
path).Msg("cannot create file")
        }
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index 3a59ae24..b943a6f1 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -126,8 +126,8 @@ func (fs *localFileSystem) ReadDir(dirname string) 
[]DirEntry {
 }
 
 // CreateFile is used to create and open the file by specified name and mode.
-func (fs *localFileSystem) CreateFile(name string, flag int, permission Mode) 
(File, error) {
-       file, err := os.OpenFile(name, flag, os.FileMode(permission))
+func (fs *localFileSystem) CreateFile(name string, permission Mode) (File, 
error) {
+       file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 
os.FileMode(permission))
        switch {
        case err == nil:
                return &LocalFile{
@@ -417,19 +417,6 @@ func (file *LocalFile) Close() error {
        return nil
 }
 
-// Clear file content.
-func (file *LocalFile) Clear() error {
-       err := file.file.Truncate(0)
-       if err != nil {
-               return err
-       }
-       _, err = file.file.Seek(0, 0)
-       if err != nil {
-               return err
-       }
-       return nil
-}
-
 type seqReader struct {
        reader   *bufio.Reader
        fileName string
diff --git a/pkg/fs/local_file_system_test.go b/pkg/fs/local_file_system_test.go
index 77858d06..eaf278a5 100644
--- a/pkg/fs/local_file_system_test.go
+++ b/pkg/fs/local_file_system_test.go
@@ -46,7 +46,7 @@ var _ = ginkgo.Describe("Loacl File System", func() {
                        fs = NewLocalFileSystem()
                        err := os.MkdirAll(dirName, 0o777)
                        gomega.Expect(err).ToNot(gomega.HaveOccurred())
-                       file, err = fs.CreateFile(fileName, 
os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o777)
+                       file, err = fs.CreateFile(fileName, 0o777)
                        gomega.Expect(err).ToNot(gomega.HaveOccurred())
                        _, err = os.Stat(fileName)
                        gomega.Expect(err).ToNot(gomega.HaveOccurred())

Reply via email to