This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 96fbbc65 Add snapshot time retention policy and fix lifecycle panic
issue (#950)
96fbbc65 is described below
commit 96fbbc65409763d0cc5b8c37339618aa89177c02
Author: mrproliu <[email protected]>
AuthorDate: Mon Jan 19 21:37:18 2026 +0800
Add snapshot time retention policy and fix lifecycle panic issue (#950)
* Add snapshot time retention policy and fix lifecycle panic issue
---
CHANGES.md | 2 +
.../backup/lifecycle/trace_migration_visitor.go | 4 +
banyand/internal/storage/snapshot.go | 36 ++++-
banyand/internal/storage/snapshot_test.go | 164 ++++++++++++++++++++-
banyand/measure/snapshot.go | 4 +-
banyand/measure/svc_data.go | 6 +-
banyand/measure/svc_standalone.go | 2 +
banyand/property/listener.go | 4 +-
banyand/property/service.go | 4 +-
banyand/stream/snapshot.go | 4 +-
banyand/stream/svc_standalone.go | 4 +-
banyand/trace/svc_standalone.go | 8 +-
pkg/fs/file_system.go | 2 +
pkg/fs/local_file_system.go | 5 +
14 files changed, 232 insertions(+), 17 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 3f096083..3e70e8d2 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -17,6 +17,7 @@ Release Notes.
- Persist series metadata in liaison queue for measure, stream and trace
models.
- Update the dump tool to support analyzing the parts with smeta files.
- Activate the property repair mechanism by default.
+- Add snapshot time retention policy to ensure the snapshot only can be
deleted after the configured minimum age(time).
### Bug Fixes
@@ -27,6 +28,7 @@ Release Notes.
- Fix unsupported empty string tag bug.
- Fix duplicate elements in stream query results by implementing element
ID-based deduplication across scan, merge, and result building stages.
- Fix data written to the wrong shard and related stream queries.
+- Fix the lifecycle panic when the trace has no sidx.
### Document
diff --git a/banyand/backup/lifecycle/trace_migration_visitor.go
b/banyand/backup/lifecycle/trace_migration_visitor.go
index 3b767a86..2936d7c1 100644
--- a/banyand/backup/lifecycle/trace_migration_visitor.go
+++ b/banyand/backup/lifecycle/trace_migration_visitor.go
@@ -289,6 +289,10 @@ func (mv *traceMigrationVisitor) generateAllSidxPartData(
sourceShardID common.ShardID,
sidxPath string,
) ([]queue.StreamingPartData, []func(), error) {
+ // If the sidx does not exist, then ignore for the life cycle
+ if !mv.lfs.IsExist(sidxPath) {
+ return nil, nil, nil
+ }
// Sidx structure: sidx/{index-name}/{part-id}/files
// Find all index directories in the sidx directory
entries := mv.lfs.ReadDir(sidxPath)
diff --git a/banyand/internal/storage/snapshot.go
b/banyand/internal/storage/snapshot.go
index ff2fc615..e5cdd2c6 100644
--- a/banyand/internal/storage/snapshot.go
+++ b/banyand/internal/storage/snapshot.go
@@ -18,16 +18,34 @@
package storage
import (
+ "fmt"
"os"
"path/filepath"
"sort"
"time"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
)
+// SnapshotTimeFormat is the timestamp snapshot directory prefix.
+const SnapshotTimeFormat = "20060102150405"
+
+// ParseSnapshotTimestamp extracts the creation time from a snapshot directory
name.
+func ParseSnapshotTimestamp(name string) (time.Time, error) {
+ if len(name) < 14 {
+ return time.Time{}, fmt.Errorf("snapshot name too short: %s",
name)
+ }
+ timestampStr := name[:14]
+ parsedTime, parseErr := time.Parse(SnapshotTimeFormat, timestampStr)
+ if parseErr != nil {
+ return time.Time{}, fmt.Errorf("failed to parse timestamp from
snapshot name %s: %w", name, parseErr)
+ }
+ return parsedTime, nil
+}
+
// DeleteStaleSnapshots deletes the stale snapshots in the root directory.
-func DeleteStaleSnapshots(root string, maxNum int, lfs fs.FileSystem) {
+func DeleteStaleSnapshots(root string, maxNum int, minAge time.Duration, lfs
fs.FileSystem) {
if maxNum <= 0 {
return
}
@@ -40,8 +58,22 @@ func DeleteStaleSnapshots(root string, maxNum int, lfs
fs.FileSystem) {
sort.Slice(snapshots, func(i, j int) bool {
return snapshots[i].Name() < snapshots[j].Name()
})
+ now := time.Now()
for i := 0; i < len(snapshots)-maxNum; i++ {
- lfs.MustRMAll(filepath.Join(root, snapshots[i].Name()))
+ snapshotName := snapshots[i].Name()
+ // If the min age is not set, then only keep using the max num
to delete
+ if minAge == 0 {
+ lfs.MustRMAll(filepath.Join(root, snapshotName))
+ continue
+ }
+ snapshotTime, parseErr := ParseSnapshotTimestamp(snapshotName)
+ if parseErr != nil {
+ logger.GetLogger().Warn().Err(parseErr).Str("snapshot",
snapshotName).Msg("failed to parse snapshot timestamp, skipping")
+ continue
+ }
+ if now.Sub(snapshotTime) >= minAge {
+ lfs.MustRMAll(filepath.Join(root, snapshotName))
+ }
}
}
diff --git a/banyand/internal/storage/snapshot_test.go
b/banyand/internal/storage/snapshot_test.go
index b3b4c0da..e4045903 100644
--- a/banyand/internal/storage/snapshot_test.go
+++ b/banyand/internal/storage/snapshot_test.go
@@ -18,9 +18,11 @@
package storage
import (
+ "fmt"
"path/filepath"
"sort"
"testing"
+ "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -29,7 +31,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test"
)
-func TestDeleteStaleSnapshots(t *testing.T) {
+func TestDeleteStaleSnapshotsWithCount(t *testing.T) {
fileSystem := fs.NewLocalFileSystem()
tmpPath, deferFn := test.Space(require.New(t))
defer deferFn()
@@ -48,7 +50,7 @@ func TestDeleteStaleSnapshots(t *testing.T) {
fileSystem.MkdirIfNotExist(dirPath, 0o755)
}
- DeleteStaleSnapshots(snapshotsRoot, 2, fileSystem)
+ DeleteStaleSnapshots(snapshotsRoot, 2, 0, fileSystem)
remaining := fileSystem.ReadDir(snapshotsRoot)
require.Equal(t, 2, len(remaining))
@@ -60,3 +62,161 @@ func TestDeleteStaleSnapshots(t *testing.T) {
sort.Strings(names)
assert.Equal(t, []string{"20201010101010-00000003",
"20201010101010-00000004"}, names)
}
+
+func TestParseSnapshotTimestamp(t *testing.T) {
+ tests := []struct {
+ expectTime time.Time
+ name string
+ snapshotDir string
+ expectErr bool
+ }{
+ {
+ name: "valid snapshot name",
+ snapshotDir: "20201010101010-00000001",
+ expectErr: false,
+ expectTime: time.Date(2020, 10, 10, 10, 10, 10, 0,
time.UTC),
+ },
+ {
+ name: "another valid snapshot",
+ snapshotDir: "20231225153045-12345678",
+ expectErr: false,
+ expectTime: time.Date(2023, 12, 25, 15, 30, 45, 0,
time.UTC),
+ },
+ {
+ name: "snapshot name too short",
+ snapshotDir: "2020101010",
+ expectErr: true,
+ },
+ {
+ name: "invalid timestamp format",
+ snapshotDir: "abcd1010101010-00000001",
+ expectErr: true,
+ },
+ {
+ name: "empty name",
+ snapshotDir: "",
+ expectErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ parsedTime, parseErr :=
ParseSnapshotTimestamp(tt.snapshotDir)
+ if tt.expectErr {
+ require.Error(t, parseErr)
+ } else {
+ require.NoError(t, parseErr)
+ assert.Equal(t, tt.expectTime, parsedTime)
+ }
+ })
+ }
+}
+
+func TestDeleteStaleSnapshotsWithMinAge(t *testing.T) {
+ tests := []struct {
+ name string
+ snapshotAges []time.Duration
+ maxNum int
+ minAge time.Duration
+ expectedRemaining int
+ oldestDeletedAge time.Duration
+ validateOldestDeleted bool
+ }{
+ {
+ name: "all snapshots within age threshold
- no deletion",
+ maxNum: 2,
+ minAge: 1 * time.Hour,
+ snapshotAges: []time.Duration{-30 * time.Minute,
-45 * time.Minute, -50 * time.Minute},
+ expectedRemaining: 3,
+ },
+ {
+ name: "count exceeded and old
snapshots exist - delete old only",
+ maxNum: 2,
+ minAge: 1 * time.Hour,
+ snapshotAges: []time.Duration{-3 * time.Hour,
-2 * time.Hour, -30 * time.Minute},
+ expectedRemaining: 2,
+ validateOldestDeleted: true,
+ oldestDeletedAge: -3 * time.Hour,
+ },
+ {
+ name: "count not exceeded - no deletion
despite old snapshots",
+ maxNum: 5,
+ minAge: 1 * time.Hour,
+ snapshotAges: []time.Duration{-3 * time.Hour, -2 *
time.Hour, -30 * time.Minute},
+ expectedRemaining: 3,
+ },
+ {
+ name: "all snapshots old and count
exceeded - delete to max",
+ maxNum: 2,
+ minAge: 1 * time.Hour,
+ snapshotAges: []time.Duration{-5 * time.Hour,
-4 * time.Hour, -3 * time.Hour, -2 * time.Hour},
+ expectedRemaining: 2,
+ validateOldestDeleted: true,
+ oldestDeletedAge: -5 * time.Hour,
+ },
+ {
+ name: "mixed ages with high max - keep
all",
+ maxNum: 10,
+ minAge: 1 * time.Hour,
+ snapshotAges: []time.Duration{-5 * time.Hour, -2 *
time.Hour, -30 * time.Minute, -10 * time.Minute},
+ expectedRemaining: 4,
+ },
+ {
+ name: "boundary case - snapshot at
threshold is deleted",
+ maxNum: 1,
+ minAge: 1 * time.Hour,
+ snapshotAges: []time.Duration{-1*time.Hour -
1*time.Minute, -59 * time.Minute},
+ expectedRemaining: 1,
+ validateOldestDeleted: true,
+ oldestDeletedAge: -1*time.Hour - 1*time.Minute,
+ },
+ {
+ name: "large number of old snapshots",
+ maxNum: 3,
+ minAge: 30 * time.Minute,
+ snapshotAges: []time.Duration{-5 * time.Hour,
-4 * time.Hour, -3 * time.Hour, -2 * time.Hour, -1 * time.Hour, -45 *
time.Minute, -20 * time.Minute},
+ expectedRemaining: 3,
+ validateOldestDeleted: true,
+ oldestDeletedAge: -5 * time.Hour,
+ },
+ {
+ name: "partial deletion - some candidates
too young",
+ maxNum: 2,
+ minAge: 2 * time.Hour,
+ snapshotAges: []time.Duration{-5 * time.Hour,
-1*time.Hour - 30*time.Minute, -45 * time.Minute, -30 * time.Minute},
+ expectedRemaining: 3,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+ snapshotsRoot := filepath.Join(tmpPath, "snapshots")
+ fileSystem.MkdirIfNotExist(snapshotsRoot, 0o755)
+ now := time.Now().UTC()
+ createdSnapshots := make(map[string]time.Time)
+ for idx, age := range tt.snapshotAges {
+ snapshotTime := now.Add(age)
+ snapshotName := fmt.Sprintf("%s-%08d",
snapshotTime.Format(SnapshotTimeFormat), idx+1)
+ dirPath := filepath.Join(snapshotsRoot,
snapshotName)
+ fileSystem.MkdirIfNotExist(dirPath, 0o755)
+ createdSnapshots[snapshotName] = snapshotTime
+ }
+ DeleteStaleSnapshots(snapshotsRoot, tt.maxNum,
tt.minAge, fileSystem)
+ remaining := fileSystem.ReadDir(snapshotsRoot)
+ require.Equal(t, tt.expectedRemaining, len(remaining),
"unexpected number of remaining snapshots")
+ if tt.validateOldestDeleted {
+ oldestDeletedTime :=
now.Add(tt.oldestDeletedAge)
+ oldestDeletedName := fmt.Sprintf("%s-%08d",
oldestDeletedTime.Format(SnapshotTimeFormat), 1)
+ found := false
+ for _, info := range remaining {
+ if info.Name() == oldestDeletedName {
+ found = true
+ break
+ }
+ }
+ require.False(t, found, "oldest snapshot %s
should have been deleted", oldestDeletedName)
+ }
+ })
+ }
+}
diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go
index 4389d7c0..71ccb244 100644
--- a/banyand/measure/snapshot.go
+++ b/banyand/measure/snapshot.go
@@ -359,7 +359,7 @@ func (s *snapshotListener) Rev(ctx context.Context, message
bus.Message) bus.Mes
}
s.snapshotMux.Lock()
defer s.snapshotMux.Unlock()
- storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum,
s.s.lfs)
+ storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum,
s.s.minFileSnapshotAge, s.s.lfs)
sn := s.snapshotName()
var err error
for _, g := range gg {
@@ -392,5 +392,5 @@ func (s *snapshotListener) Rev(ctx context.Context, message
bus.Message) bus.Mes
func (s *snapshotListener) snapshotName() string {
s.snapshotSeq++
- return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format("20060102150405"), s.snapshotSeq)
+ return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format(storage.SnapshotTimeFormat), s.snapshotSeq)
}
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index e7a5a0b3..fee99941 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -75,6 +75,7 @@ type dataSVC struct {
retentionConfig storage.RetentionConfig
cc storage.CacheConfig
maxFileSnapshotNum int
+ minFileSnapshotAge time.Duration
}
func (s *dataSVC) Measure(metadata *commonv1.Metadata) (Measure, error) {
@@ -178,6 +179,7 @@ func (s *dataSVC) FlagSet() *run.FlagSet {
"enable forced retention cleanup when disk usage exceeds high
watermark")
flagS.IntVar(&s.maxFileSnapshotNum, "measure-max-file-snapshot-num",
10, "the maximum number of file snapshots allowed")
+ flagS.DurationVar(&s.minFileSnapshotAge,
"measure-min-file-snapshot-age", time.Hour, "minimum age for file snapshots to
be eligible for deletion")
s.cc.MaxCacheSize = run.Bytes(100 * 1024 * 1024)
flagS.VarP(&s.cc.MaxCacheSize, "service-cache-max-size", "", "maximum
service cache size (e.g., 100M)")
flagS.DurationVar(&s.cc.CleanupInterval,
"service-cache-cleanup-interval", 30*time.Second, "service cache cleanup
interval")
@@ -481,7 +483,7 @@ func (d *dataSnapshotListener) Rev(ctx context.Context,
message bus.Message) bus
}
d.snapshotMux.Lock()
defer d.snapshotMux.Unlock()
- storage.DeleteStaleSnapshots(d.s.snapshotDir, d.s.maxFileSnapshotNum,
d.s.lfs)
+ storage.DeleteStaleSnapshots(d.s.snapshotDir, d.s.maxFileSnapshotNum,
d.s.minFileSnapshotAge, d.s.lfs)
sn := d.snapshotName()
var err error
for _, g := range gg {
@@ -509,7 +511,7 @@ func (d *dataSnapshotListener) Rev(ctx context.Context,
message bus.Message) bus
func (d *dataSnapshotListener) snapshotName() string {
d.snapshotSeq++
- return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format("20060102150405"), d.snapshotSeq)
+ return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format(storage.SnapshotTimeFormat), d.snapshotSeq)
}
type dataDeleteStreamSegmentsListener struct {
diff --git a/banyand/measure/svc_standalone.go
b/banyand/measure/svc_standalone.go
index 84064a2e..1c444380 100644
--- a/banyand/measure/svc_standalone.go
+++ b/banyand/measure/svc_standalone.go
@@ -79,6 +79,7 @@ type standalone struct {
retentionConfig storage.RetentionConfig
cc storage.CacheConfig
maxFileSnapshotNum int
+ minFileSnapshotAge time.Duration
}
func (s *standalone) Measure(metadata *commonv1.Metadata) (Measure, error) {
@@ -182,6 +183,7 @@ func (s *standalone) FlagSet() *run.FlagSet {
"enable forced retention cleanup when disk usage exceeds high
watermark")
flagS.IntVar(&s.maxFileSnapshotNum, "measure-max-file-snapshot-num",
10, "the maximum number of file snapshots allowed")
+ flagS.DurationVar(&s.minFileSnapshotAge,
"measure-min-file-snapshot-age", time.Hour, "minimum age for file snapshots to
be eligible for deletion")
s.cc.MaxCacheSize = run.Bytes(100 * 1024 * 1024)
flagS.VarP(&s.cc.MaxCacheSize, "service-cache-max-size", "", "maximum
service cache size (e.g., 100M)")
flagS.DurationVar(&s.cc.CleanupInterval,
"service-cache-cleanup-interval", 30*time.Second, "service cache cleanup
interval")
diff --git a/banyand/property/listener.go b/banyand/property/listener.go
index c381eaac..bfb13d03 100644
--- a/banyand/property/listener.go
+++ b/banyand/property/listener.go
@@ -232,7 +232,7 @@ func (s *snapshotListener) Rev(ctx context.Context, message
bus.Message) bus.Mes
}
s.snapshotMux.Lock()
defer s.snapshotMux.Unlock()
- storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum,
s.s.lfs)
+ storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum,
s.s.minFileSnapshotAge, s.s.lfs)
sn := s.snapshotName()
shardsRef := s.s.db.sLst.Load()
if shardsRef == nil {
@@ -266,7 +266,7 @@ func (s *snapshotListener) Rev(ctx context.Context, message
bus.Message) bus.Mes
func (s *snapshotListener) snapshotName() string {
s.snapshotSeq++
- return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format("20060102150405"), s.snapshotSeq)
+ return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format(storage.SnapshotTimeFormat), s.snapshotSeq)
}
type repairListener struct {
diff --git a/banyand/property/service.go b/banyand/property/service.go
index b3be9b1d..db239866 100644
--- a/banyand/property/service.go
+++ b/banyand/property/service.go
@@ -76,6 +76,7 @@ type service struct {
repairTreeSlotCount int
maxDiskUsagePercent int
maxFileSnapshotNum int
+ minFileSnapshotAge time.Duration
repairEnabled bool
}
@@ -84,7 +85,8 @@ func (s *service) FlagSet() *run.FlagSet {
flagS.StringVar(&s.root, "property-root-path", "/tmp", "the root path
of database")
flagS.DurationVar(&s.flushTimeout, "property-flush-timeout",
defaultFlushTimeout, "the memory data timeout of measure")
flagS.IntVar(&s.maxDiskUsagePercent, "property-max-disk-usage-percent",
95, "the maximum disk usage percentage allowed")
- flagS.IntVar(&s.maxFileSnapshotNum, "property-max-file-snapshot-num",
2, "the maximum number of file snapshots allowed")
+ flagS.IntVar(&s.maxFileSnapshotNum, "property-max-file-snapshot-num",
10, "the maximum number of file snapshots allowed")
+ flagS.DurationVar(&s.minFileSnapshotAge,
"property-min-file-snapshot-age", time.Hour, "the minimum age for file
snapshots to be eligible for deletion")
flagS.DurationVar(&s.expireTimeout, "property-expire-delete-timeout",
time.Hour*24*7, "the duration of the expired data needs to be deleted")
flagS.IntVar(&s.repairTreeSlotCount, "property-repair-tree-slot-count",
32, "the slot count of the repair tree")
flagS.StringVar(&s.repairBuildTreeCron,
"property-repair-build-tree-cron", "@every 1h", "the cron expression for
repairing the build tree")
diff --git a/banyand/stream/snapshot.go b/banyand/stream/snapshot.go
index 41008a5f..6e45a786 100644
--- a/banyand/stream/snapshot.go
+++ b/banyand/stream/snapshot.go
@@ -410,7 +410,7 @@ func (s *snapshotListener) Rev(ctx context.Context, message
bus.Message) bus.Mes
}
s.snapshotMux.Lock()
defer s.snapshotMux.Unlock()
- storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum,
s.s.lfs)
+ storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum,
s.s.minFileSnapshotAge, s.s.lfs)
sn := s.snapshotName()
var err error
for _, g := range gg {
@@ -443,5 +443,5 @@ func (s *snapshotListener) Rev(ctx context.Context, message
bus.Message) bus.Mes
func (s *snapshotListener) snapshotName() string {
s.snapshotSeq++
- return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format("20060102150405"), s.snapshotSeq)
+ return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format(storage.SnapshotTimeFormat), s.snapshotSeq)
}
diff --git a/banyand/stream/svc_standalone.go b/banyand/stream/svc_standalone.go
index cc0dfc46..1be24bf7 100644
--- a/banyand/stream/svc_standalone.go
+++ b/banyand/stream/svc_standalone.go
@@ -78,6 +78,7 @@ type standalone struct {
option option
retentionConfig storage.RetentionConfig
maxFileSnapshotNum int
+ minFileSnapshotAge time.Duration
}
func (s *standalone) Stream(metadata *commonv1.Metadata) (Stream, error) {
@@ -181,7 +182,8 @@ func (s *standalone) FlagSet() *run.FlagSet {
flagS.BoolVar(&s.retentionConfig.ForceCleanupEnabled,
"stream-retention-force-cleanup-enabled", false,
"enable forced retention cleanup when disk usage exceeds high
watermark")
- flagS.IntVar(&s.maxFileSnapshotNum, "stream-max-file-snapshot-num", 2,
"the maximum number of file snapshots allowed")
+ flagS.IntVar(&s.maxFileSnapshotNum, "stream-max-file-snapshot-num", 10,
"the maximum number of file snapshots allowed")
+ flagS.DurationVar(&s.minFileSnapshotAge,
"stream-min-file-snapshot-age", time.Hour, "minimum age for file snapshots to
be eligible for deletion")
return flagS
}
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index 6902cab5..ec893bbf 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -74,6 +74,7 @@ type standalone struct {
option option
retentionConfig storage.RetentionConfig
maxFileSnapshotNum int
+ minFileSnapshotAge time.Duration
}
func (s *standalone) FlagSet() *run.FlagSet {
@@ -90,7 +91,8 @@ func (s *standalone) FlagSet() *run.FlagSet {
fs.BoolVar(&s.retentionConfig.ForceCleanupEnabled,
"trace-retention-force-cleanup-enabled", false,
"enable forced retention cleanup when disk usage exceeds high
watermark")
- fs.IntVar(&s.maxFileSnapshotNum, "trace-max-file-snapshot-num", 2, "the
maximum number of file snapshots")
+ fs.IntVar(&s.maxFileSnapshotNum, "trace-max-file-snapshot-num", 10,
"the maximum number of file snapshots")
+ fs.DurationVar(&s.minFileSnapshotAge, "trace-min-file-snapshot-age",
time.Hour, "minimum age for file snapshots to be eligible for deletion")
s.option.mergePolicy = newDefaultMergePolicy()
fs.VarP(&s.option.mergePolicy.maxFanOutSize, "trace-max-fan-out-size",
"", "the upper bound of a single file size after merge of trace")
// Additional flags can be added here
@@ -446,7 +448,7 @@ func (d *standaloneSnapshotListener) Rev(ctx
context.Context, message bus.Messag
}
d.snapshotMux.Lock()
defer d.snapshotMux.Unlock()
- storage.DeleteStaleSnapshots(d.s.snapshotDir, d.s.maxFileSnapshotNum,
d.s.lfs)
+ storage.DeleteStaleSnapshots(d.s.snapshotDir, d.s.maxFileSnapshotNum,
d.s.minFileSnapshotAge, d.s.lfs)
sn := d.snapshotName()
var err error
for _, g := range gg {
@@ -479,7 +481,7 @@ func (d *standaloneSnapshotListener) Rev(ctx
context.Context, message bus.Messag
func (d *standaloneSnapshotListener) snapshotName() string {
d.snapshotSeq++
- return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format("20060102150405"), d.snapshotSeq)
+ return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format(storage.SnapshotTimeFormat), d.snapshotSeq)
}
type standaloneDeleteTraceSegmentsListener struct {
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 57d46083..9bd585e8 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -124,6 +124,8 @@ type FileSystem interface {
MustGetTotalSpace(path string) uint64
// CreateHardLink creates hard links in destPath for files in srcPath
that pass the filter.
CreateHardLink(srcPath, destPath string, filter func(string) bool) error
+ // IsExist checks if the directory/file exists or not.
+ IsExist(path string) bool
}
// DirEntry is the interface that wraps the basic information about a file or
directory.
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index 451f46c1..f025653f 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -309,6 +309,11 @@ func (fs *localFileSystem) MustGetTotalSpace(path string)
uint64 {
return usage.Total
}
+func (fs *localFileSystem) IsExist(path string) bool {
+ _, err := os.Stat(path)
+ return err == nil
+}
+
func (fs *localFileSystem) CreateHardLink(srcPath, destPath string, filter
func(string) bool) error {
fi, err := os.Stat(srcPath)
if err != nil {