mrproliu commented on code in PR #1181:
URL:
https://github.com/apache/skywalking-banyandb/pull/1181#discussion_r3427200441
##########
banyand/backup/backup.go:
##########
@@ -233,28 +256,78 @@ func getTimeDir(style string) string {
}
}
-func backupSnapshot(ctx context.Context, fs remote.FS, snapshotDir, catalog,
timeDir string) error {
- localFiles, err := getAllFiles(snapshotDir)
+func backupSnapshot(ctx context.Context, fs remote.FS, snapshotDir, catalog,
timeDir string, concurrency int) error {
+ prefix := path.Join(timeDir, catalog)
+
+ remoteFiles, err := fs.List(ctx, prefix+"/")
if err != nil {
return err
}
+ // Build a set of existing remote files for O(1) lookups. The local
file list
+ // is never materialized; instead the snapshot is walked file by file
so memory
+ // stays bounded by the remote file count rather than the (much larger)
local
+ // file count.
+ remoteSet := make(map[string]struct{}, len(remoteFiles))
+ for _, remoteFile := range remoteFiles {
+ remoteSet[remoteFile] = struct{}{}
+ }
- remotePrefix := path.Join(timeDir, catalog) + "/"
-
- remoteFiles, err := fs.List(ctx, remotePrefix)
- if err != nil {
- return err
+ if concurrency < 1 {
+ concurrency = 1
}
- for _, relPath := range localFiles {
- remotePath := path.Join(timeDir, catalog, relPath)
- if !contains(remoteFiles, remotePath) {
- if err := uploadFile(ctx, fs, snapshotDir, relPath,
remotePath); err != nil {
- return err
- }
+ g, gctx := errgroup.WithContext(ctx)
+ g.SetLimit(concurrency)
+
+ walkErr := filepath.Walk(snapshotDir, func(filePath string, info
os.FileInfo, iterErr error) error {
+ if iterErr != nil {
+ return iterErr
+ }
+ if info.IsDir() {
+ return nil
+ }
+ // Stop walking promptly if a concurrent upload has already
failed.
+ if gctx.Err() != nil {
+ return gctx.Err()
+ }
+ relPath, relErr := filepath.Rel(snapshotDir, filePath)
+ if relErr != nil {
+ return relErr
+ }
+ relPath = filepath.ToSlash(relPath)
+ remotePath := path.Join(prefix, relPath)
+ if _, ok := remoteSet[remotePath]; ok {
+ // Present both locally and remotely: keep it and drop
it from the
+ // set so that whatever remains is exactly the orphaned
remote files.
+ delete(remoteSet, remotePath)
+ return nil
+ }
+ if info.Size() < smallFileThreshold {
+ // Small files dominate and are latency-bound: upload
them concurrently.
+ // relPath/remotePath are per-callback locals, so
capturing them is safe.
+ // g.Go blocks once the limit is reached, providing
natural backpressure.
+ g.Go(func() error {
+ return uploadFile(gctx, fs, snapshotDir,
relPath, remotePath)
+ })
+ return nil
}
+ // Large files are uploaded sequentially so at most one large
write buffer
+ // is held at a time, keeping peak memory bounded.
+ return uploadFile(gctx, fs, snapshotDir, relPath, remotePath)
+ })
+ // Always wait for in-flight uploads before returning, even on a walk
error.
+ if waitErr := g.Wait(); waitErr != nil {
+ return waitErr
+ }
+ if walkErr != nil {
+ return walkErr
Review Comment:
using `context.WithCancel` to cancel the walk error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]