This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 310432329 Fix backup container OOM and speed up GCS snapshot uploads 
(#1181)
310432329 is described below

commit 31043232977d147578e581511239bf24811d6f81
Author: mrproliu <[email protected]>
AuthorDate: Wed Jun 17 22:18:36 2026 +0800

    Fix backup container OOM and speed up GCS snapshot uploads (#1181)
---
 CHANGES.md                                         |   2 +
 banyand/backup/backup.go                           | 158 +++++++++++++++------
 banyand/backup/backup_test.go                      |  81 ++++++++++-
 docs/operation/backup.md                           |   1 +
 pkg/fs/remote/checksum/interface.go                |   3 +
 pkg/fs/remote/checksum/sha256.go                   |   8 ++
 .../checksum/{interface.go => sha256_test.go}      |  41 ++++--
 pkg/fs/remote/gcp/gcs.go                           |  64 ++++++++-
 8 files changed, 301 insertions(+), 57 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 3bdeb2623..a254f3fc2 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -66,6 +66,7 @@ Release Notes.
 - Snapshot/backup and data inspection no longer reopen idle-closed segments, 
avoiding cold-segment nil-index panics and index lock-file churn.
 - Add opt-in vectorized measure query tracing over raw-frame distributed 
queries, including a trace envelope and fixed trace-label vocabulary.
 - Enhance segment lifecycle: `refCount` now counts only active users, 
decoupled from "open" (`index != nil`), adding a "dormant" state (open, 
`refCount == 0`). A `DecRef` to zero no longer closes a segment; idle reclaim 
and retention delete act only at `refCount == 0`, so an in-flight 
snapshot/inspect is no longer torn down mid-operation (fixing the cold-node 
nil-index panic and bluge lock churn) while idle segments still release their 
bluge writers.
+- Speed up GCS backup uploads: write each object and its checksum metadata in 
one request, dropping the per-object `Update` round-trip.
 - Lifecycle migration now archives rows whose measure/stream schema was 
deleted from the registry, instead of aborting the group.
 
 ### Bug Fixes
@@ -117,6 +118,7 @@ Release Notes.
 - Fix FODC agent labeling metrics with `node_role="ROLE_UNSPECIFIED"`. The 
agent resolved the node role exactly once at startup via a single 
`GetCurrentNode` poll whose endpoint retries spanned only ~1s; when the sibling 
lifecycle/banyandb gRPC server was not yet listening (`connect: cannot assign 
requested address`) the role fell back to `ROLE_UNSPECIFIED` permanently, so 
most nodes never reported their real `ROLE_DATA`/`ROLE_LIAISON`. Retry the 
initial node-role resolution with exponen [...]
 - Fix lifecycle row-replay OOM on large measure parts by streaming the dump 
reader, pooling size-classed marshal buffers, and bounding in-flight batch 
bytes (default 32 MiB); peak heap drops ~1.5 GB→~296 MB.
 - Consolidate lifecycle migration report errors into a single flat list of 
structured, stage-aware entries.
+- Fix backup container OOM from overlapping scheduled runs; serialize runs and 
upload small snapshot files concurrently.
 
 ### Document
 
diff --git a/banyand/backup/backup.go b/banyand/backup/backup.go
index 9fd35cbe1..68ac063ea 100644
--- a/banyand/backup/backup.go
+++ b/banyand/backup/backup.go
@@ -28,6 +28,7 @@ import (
        "path"
        "path/filepath"
        "strings"
+       "sync/atomic"
        "syscall"
        "time"
 
@@ -35,6 +36,7 @@ import (
        "github.com/robfig/cron/v3"
        "github.com/spf13/cobra"
        "go.uber.org/multierr"
+       "golang.org/x/sync/errgroup"
 
        "github.com/apache/skywalking-banyandb/banyand/backup/snapshot"
        cfg "github.com/apache/skywalking-banyandb/pkg/config"
@@ -50,20 +52,28 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
+// smallFileThreshold is the size below which files are uploaded concurrently.
+// Backup snapshots are dominated by tiny index files whose upload cost is 
bound
+// by per-request latency rather than bandwidth, so parallelizing them shortens
+// the overall backup well within the schedule interval. Larger files are 
uploaded
+// sequentially to keep the concurrent write-buffer memory bounded.
+const smallFileThreshold = 5 << 20 // 5 MiB
+
 type backupOptions struct {
-       fsConfig     remoteconfig.FsConfig
-       gRPCAddr     string
-       cert         string
-       timeStyle    string
-       schedule     string
-       streamRoot   string
-       measureRoot  string
-       propertyRoot string
-       traceRoot    string
-       schemaRoot   string
-       dest         string
-       enableTLS    bool
-       insecure     bool
+       fsConfig          remoteconfig.FsConfig
+       gRPCAddr          string
+       cert              string
+       timeStyle         string
+       schedule          string
+       streamRoot        string
+       measureRoot       string
+       propertyRoot      string
+       traceRoot         string
+       schemaRoot        string
+       dest              string
+       uploadConcurrency int
+       enableTLS         bool
+       insecure          bool
 }
 
 // NewBackupCommand creates a new backup command.
@@ -94,7 +104,19 @@ func NewBackupCommand() *cobra.Command {
                        schedLogger.Info().Msgf("backup to %s will run with 
schedule: %s", backupOpts.dest, backupOpts.schedule)
                        clockInstance := clock.New()
                        sch := timestamp.NewScheduler(schedLogger, 
clockInstance)
+                       // A full backup may legitimately run longer than the 
schedule interval.
+                       // The scheduler abandons (but does not cancel) an 
action that exceeds its
+                       // internal timeout, so without this guard a slow run 
would overlap with the
+                       // next scheduled run, stacking concurrent uploads 
until the process is
+                       // OOM-killed. backupInFlight ensures only one backup 
runs at a time: a tick
+                       // that fires while the previous run is still in 
progress is skipped.
+                       var backupInFlight atomic.Bool
                        err := sch.Register(cmd.Context(), "backup", 
cron.Descriptor, backupOpts.schedule, func(ctx context.Context, _ time.Time, l 
*logger.Logger) bool {
+                               if !backupInFlight.CompareAndSwap(false, true) {
+                                       l.Warn().Msg("previous backup is still 
running; skipping this scheduled run")
+                                       return true
+                               }
+                               defer backupInFlight.Store(false)
                                err := backupAction(ctx, backupOpts)
                                if err != nil {
                                        l.Error().Err(err).Msg("backup failed")
@@ -130,6 +152,7 @@ func NewBackupCommand() *cobra.Command {
        cmd.Flags().StringVar(&backupOpts.schemaRoot, "schema-root-path", 
"/tmp", "Root directory for schema property catalog")
        cmd.Flags().StringVar(&backupOpts.dest, "dest", "", "Destination URL 
(e.g., file:///backups)")
        cmd.Flags().StringVar(&backupOpts.timeStyle, "time-style", "daily", 
"Time directory style (daily|hourly)")
+       cmd.Flags().IntVar(&backupOpts.uploadConcurrency, "upload-concurrency", 
8, "Number of concurrent uploads for small files (<5MiB)")
        cmd.Flags().StringVar(
                &backupOpts.schedule,
                "schedule",
@@ -198,7 +221,7 @@ func backupAction(ctx context.Context, options 
backupOptions) error {
                if strings.HasPrefix(snp.Name, 
snapshot.SchemaPropertyCatalogName+"/") {
                        catalogName = snapshot.SchemaPropertyCatalogName
                }
-               multierr.AppendInto(&err, backupSnapshot(ctx, fs, snapshotDir, 
catalogName, timeDir))
+               multierr.AppendInto(&err, backupSnapshot(ctx, fs, snapshotDir, 
catalogName, timeDir, options.uploadConcurrency))
        }
        return err
 }
@@ -233,28 +256,92 @@ func getTimeDir(style string) string {
        }
 }
 
-func backupSnapshot(ctx context.Context, fs remote.FS, snapshotDir, catalog, 
timeDir string) error {
-       localFiles, err := getAllFiles(snapshotDir)
+func backupSnapshot(ctx context.Context, fs remote.FS, snapshotDir, catalog, 
timeDir string, concurrency int) error {
+       prefix := path.Join(timeDir, catalog)
+
+       remoteFiles, err := fs.List(ctx, prefix+"/")
        if err != nil {
                return err
        }
+       // Build a set of existing remote files for O(1) lookups. The local 
file list
+       // is never materialized; instead the snapshot is walked file by file 
so memory
+       // stays bounded by the remote file count rather than the (much larger) 
local
+       // file count.
+       remoteSet := make(map[string]struct{}, len(remoteFiles))
+       for _, remoteFile := range remoteFiles {
+               remoteSet[remoteFile] = struct{}{}
+       }
 
-       remotePrefix := path.Join(timeDir, catalog) + "/"
-
-       remoteFiles, err := fs.List(ctx, remotePrefix)
-       if err != nil {
-               return err
+       if concurrency < 1 {
+               concurrency = 1
        }
-       for _, relPath := range localFiles {
-               remotePath := path.Join(timeDir, catalog, relPath)
-               if !contains(remoteFiles, remotePath) {
-                       if err := uploadFile(ctx, fs, snapshotDir, relPath, 
remotePath); err != nil {
-                               return err
-                       }
+       // A dedicated cancellable context lets us stop in-flight uploads as 
soon as the
+       // walk fails, instead of letting them run to completion for a doomed 
backup.
+       uploadCtx, cancelUploads := context.WithCancel(ctx)
+       defer cancelUploads()
+       g, gctx := errgroup.WithContext(uploadCtx)
+       g.SetLimit(concurrency)
+
+       walkErr := filepath.Walk(snapshotDir, func(filePath string, info 
os.FileInfo, iterErr error) error {
+               if iterErr != nil {
+                       return iterErr
+               }
+               if info.IsDir() {
+                       return nil
+               }
+               // Stop walking promptly if a concurrent upload has already 
failed.
+               if gctx.Err() != nil {
+                       return gctx.Err()
+               }
+               relPath, relErr := filepath.Rel(snapshotDir, filePath)
+               if relErr != nil {
+                       return relErr
+               }
+               relPath = filepath.ToSlash(relPath)
+               remotePath := path.Join(prefix, relPath)
+               if _, ok := remoteSet[remotePath]; ok {
+                       // Present both locally and remotely: keep it and drop 
it from the
+                       // set so that whatever remains is exactly the orphaned 
remote files.
+                       delete(remoteSet, remotePath)
+                       return nil
                }
+               if info.Size() < smallFileThreshold {
+                       // Small files dominate and are latency-bound: upload 
them concurrently.
+                       // relPath/remotePath are per-callback locals, so 
capturing them is safe.
+                       // g.Go blocks once the limit is reached, providing 
natural backpressure.
+                       g.Go(func() error {
+                               return uploadFile(gctx, fs, snapshotDir, 
relPath, remotePath)
+                       })
+                       return nil
+               }
+               // Large files are uploaded sequentially so at most one large 
write buffer
+               // is held at a time, keeping peak memory bounded.
+               return uploadFile(gctx, fs, snapshotDir, relPath, remotePath)
+       })
+       if walkErr != nil {
+               // The backup is already failing, so stop in-flight uploads 
rather than
+               // letting them run to completion.
+               cancelUploads()
+       }
+       waitErr := g.Wait()
+       // A real upload failure takes priority; a context.Canceled that we 
induced via
+       // cancelUploads above is not itself a reportable error.
+       if waitErr != nil && !errors.Is(waitErr, context.Canceled) {
+               return waitErr
+       }
+       if walkErr != nil {
+               return walkErr
+       }
+       if waitErr != nil {
+               return waitErr
        }
 
-       deleteOrphanedFiles(ctx, fs, localFiles, remoteFiles, timeDir, catalog)
+       // Remaining entries exist remotely but no longer locally: delete them.
+       for orphan := range remoteSet {
+               if delErr := fs.Delete(ctx, orphan); delErr != nil {
+                       logger.Warningf("Warning: failed to delete orphaned 
file %s: %v\n", orphan, delErr)
+               }
+       }
        return nil
 }
 
@@ -287,21 +374,6 @@ func uploadFile(ctx context.Context, fs remote.FS, 
snapshotDir, relPath, remoteP
        return fs.Upload(ctx, remotePath, file)
 }
 
-func deleteOrphanedFiles(ctx context.Context, fs remote.FS, localFiles, 
remoteFiles []string, timeDir, snapshotName string) {
-       expected := make(map[string]struct{})
-       for _, f := range localFiles {
-               expected[path.Join(timeDir, snapshotName, f)] = struct{}{}
-       }
-
-       for _, remoteFile := range remoteFiles {
-               if _, exists := expected[remoteFile]; !exists {
-                       if err := fs.Delete(ctx, remoteFile); err != nil {
-                               logger.Warningf("Warning: failed to delete 
orphaned file %s: %v\n", remoteFile, err)
-                       }
-               }
-       }
-}
-
 func contains(slice []string, s string) bool {
        for _, item := range slice {
                if item == s {
diff --git a/banyand/backup/backup_test.go b/banyand/backup/backup_test.go
index f5c533dac..1b5ccafd7 100644
--- a/banyand/backup/backup_test.go
+++ b/banyand/backup/backup_test.go
@@ -19,10 +19,13 @@ package backup
 
 import (
        "context"
+       "fmt"
        "io"
        "os"
        "path"
        "path/filepath"
+       "strings"
+       "sync"
        "testing"
        "time"
 
@@ -158,8 +161,10 @@ func TestGetAllFiles(t *testing.T) {
 }
 
 type mockFS struct {
-       uploaded []string
-       deleted  []string
+       uploadErrOn string
+       uploaded    []string
+       deleted     []string
+       mu          sync.Mutex
 }
 
 func (m *mockFS) List(_ context.Context, prefix string) ([]string, error) {
@@ -167,11 +172,18 @@ func (m *mockFS) List(_ context.Context, prefix string) 
([]string, error) {
 }
 
 func (m *mockFS) Upload(_ context.Context, p string, _ io.Reader) error {
+       if m.uploadErrOn != "" && strings.Contains(p, m.uploadErrOn) {
+               return fmt.Errorf("mock upload failure for %s", p)
+       }
+       m.mu.Lock()
+       defer m.mu.Unlock()
        m.uploaded = append(m.uploaded, p)
        return nil
 }
 
 func (m *mockFS) Delete(_ context.Context, p string) error {
+       m.mu.Lock()
+       defer m.mu.Unlock()
        m.deleted = append(m.deleted, p)
        return nil
 }
@@ -185,7 +197,7 @@ func TestBackupSnapshot(t *testing.T) {
        os.WriteFile(filepath.Join(tmpDir, "newfile.txt"), nil, 0o600)
 
        m := &mockFS{}
-       err := backupSnapshot(context.Background(), m, tmpDir, "test-snapshot", 
"daily")
+       err := backupSnapshot(context.Background(), m, tmpDir, "test-snapshot", 
"daily", 4)
        if err != nil {
                t.Fatal(err)
        }
@@ -201,6 +213,69 @@ func TestBackupSnapshot(t *testing.T) {
        }
 }
 
+// TestBackupSnapshotConcurrent exercises the concurrent small-file path, the
+// sequential large-file path (>= smallFileThreshold), and orphan deletion all
+// at once. Run with -race to catch data races in the upload fan-out.
+func TestBackupSnapshotConcurrent(t *testing.T) {
+       tmpDir := t.TempDir()
+       const numSmall = 50
+       for i := 0; i < numSmall; i++ {
+               sub := filepath.Join(tmpDir, fmt.Sprintf("seg-%d", i%5))
+               if err := os.MkdirAll(sub, 0o750); err != nil {
+                       t.Fatal(err)
+               }
+               if err := os.WriteFile(filepath.Join(sub, 
fmt.Sprintf("f-%d.tm", i)), []byte("x"), 0o600); err != nil {
+                       t.Fatal(err)
+               }
+       }
+       // A file at exactly smallFileThreshold takes the sequential branch 
(size is
+       // not strictly less than the threshold).
+       if err := os.WriteFile(filepath.Join(tmpDir, "big.bin"), make([]byte, 
smallFileThreshold), 0o600); err != nil {
+               t.Fatal(err)
+       }
+
+       m := &mockFS{}
+       if err := backupSnapshot(context.Background(), m, tmpDir, 
"test-snapshot", "daily", 8); err != nil {
+               t.Fatal(err)
+       }
+
+       if len(m.uploaded) != numSmall+1 {
+               t.Fatalf("uploaded %d files, want %d", len(m.uploaded), 
numSmall+1)
+       }
+       uploaded := make(map[string]struct{}, len(m.uploaded))
+       for _, p := range m.uploaded {
+               uploaded[p] = struct{}{}
+       }
+       if _, ok := uploaded["daily/test-snapshot/big.bin"]; !ok {
+               t.Errorf("large file not uploaded; uploaded=%v", m.uploaded)
+       }
+       wantDelete := "daily/test-snapshot/existing.txt"
+       if len(m.deleted) != 1 || m.deleted[0] != wantDelete {
+               t.Errorf("deleted = %v, want [%s]", m.deleted, wantDelete)
+       }
+}
+
+// TestBackupSnapshotUploadError verifies that a failed upload surfaces an 
error
+// and that orphaned remote files are NOT deleted when the backup did not fully
+// succeed.
+func TestBackupSnapshotUploadError(t *testing.T) {
+       tmpDir := t.TempDir()
+       for _, name := range []string{"a.tm", "b.tm", "boom.tm"} {
+               if err := os.WriteFile(filepath.Join(tmpDir, name), 
[]byte("x"), 0o600); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       m := &mockFS{uploadErrOn: "boom.tm"}
+       err := backupSnapshot(context.Background(), m, tmpDir, "test-snapshot", 
"daily", 4)
+       if err == nil {
+               t.Fatal("expected an error when an upload fails, got nil")
+       }
+       if len(m.deleted) != 0 {
+               t.Errorf("orphans must not be deleted on a failed backup, 
deleted = %v", m.deleted)
+       }
+}
+
 func TestContains(t *testing.T) {
        tests := []struct {
                s     string
diff --git a/docs/operation/backup.md b/docs/operation/backup.md
index 5a4489042..c69b28b3a 100644
--- a/docs/operation/backup.md
+++ b/docs/operation/backup.md
@@ -110,6 +110,7 @@ When a schedule is provided, the tool:
 | `--trace-root-path`               | Root directory for the trace catalog 
snapshots.                                                                      
        | `/tmp`            |
 | `--time-style`                    | Directory naming style based on time 
(`daily` or `hourly`)                                                           
        | `daily`           |
 | `--schedule`                      | Schedule expression for periodic backup. 
Options: `@yearly`, `@monthly`, `@weekly`, `@daily`, `@hourly`, `@every 
<duration>` | _empty_           |
+| `--upload-concurrency`            | Number of concurrent uploads for small 
snapshot files (<5MiB).                                                         
      | `8`               |
 | `--logging-level`                 | Root logging level (`debug`, `info`, 
`warn`, `error`)                                                                
        | `info`            |
 | `--logging-env`                   | Logging environment (`dev` or `prod`)    
                                                                                
    | `prod`            |
 | **AWS S3 specific**               |                                          
                                                                                
    |                   |
diff --git a/pkg/fs/remote/checksum/interface.go 
b/pkg/fs/remote/checksum/interface.go
index c36184910..d91d909cb 100644
--- a/pkg/fs/remote/checksum/interface.go
+++ b/pkg/fs/remote/checksum/interface.go
@@ -29,4 +29,7 @@ type Verifier interface {
 
        // ComputeAndWrap returns a reader that computes the checksum while 
reading.
        ComputeAndWrap(r io.Reader) (wrappedReader io.Reader, getHash func() 
(string, error))
+
+       // Sum reads r to completion and returns the hex-encoded checksum of 
its contents.
+       Sum(r io.Reader) (string, error)
 }
diff --git a/pkg/fs/remote/checksum/sha256.go b/pkg/fs/remote/checksum/sha256.go
index a286fd574..bd534238f 100644
--- a/pkg/fs/remote/checksum/sha256.go
+++ b/pkg/fs/remote/checksum/sha256.go
@@ -79,6 +79,14 @@ func (v *sha256Verifier) ComputeAndWrap(r io.Reader) 
(io.Reader, func() (string,
        return hr, hr.getHash
 }
 
+func (v *sha256Verifier) Sum(r io.Reader) (string, error) {
+       wrapped, getHash := v.ComputeAndWrap(r)
+       if _, err := io.Copy(io.Discard, wrapped); err != nil {
+               return "", err
+       }
+       return getHash()
+}
+
 func (v *sha256Verifier) Wrap(rc io.ReadCloser, expected string) io.ReadCloser 
{
        h := sha256.New()
        return &verifyingReadCloser{
diff --git a/pkg/fs/remote/checksum/interface.go 
b/pkg/fs/remote/checksum/sha256_test.go
similarity index 51%
copy from pkg/fs/remote/checksum/interface.go
copy to pkg/fs/remote/checksum/sha256_test.go
index c36184910..40fd66040 100644
--- a/pkg/fs/remote/checksum/interface.go
+++ b/pkg/fs/remote/checksum/sha256_test.go
@@ -15,18 +15,39 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Package checksum provides functions for computing checksums algorithms and 
verifying.
 package checksum
 
-import "io"
+import (
+       "io"
+       "strings"
+       "testing"
+)
 
-// Verifier defines the interface for computing and verifying checksums.
-type Verifier interface {
-       // Wrap returns an io.ReadCloser that transparently verifies the 
checksum
-       // when the returned reader is closed. This enables streaming 
verification
-       // without buffering the entire content in memory.
-       Wrap(io.ReadCloser, string) io.ReadCloser
+func TestSum(t *testing.T) {
+       v, err := DefaultSHA256Verifier()
+       if err != nil {
+               t.Fatal(err)
+       }
+       // Well-known SHA-256 of "hello".
+       const want = 
"2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
+       got, err := v.Sum(strings.NewReader("hello"))
+       if err != nil {
+               t.Fatalf("Sum: %v", err)
+       }
+       if got != want {
+               t.Errorf("Sum = %s, want %s", got, want)
+       }
 
-       // ComputeAndWrap returns a reader that computes the checksum while 
reading.
-       ComputeAndWrap(r io.Reader) (wrappedReader io.Reader, getHash func() 
(string, error))
+       // Sum must agree with the streaming ComputeAndWrap path on the same 
input.
+       wrapped, getHash := v.ComputeAndWrap(strings.NewReader("hello"))
+       if _, err = io.Copy(io.Discard, wrapped); err != nil {
+               t.Fatal(err)
+       }
+       streamed, err := getHash()
+       if err != nil {
+               t.Fatal(err)
+       }
+       if streamed != got {
+               t.Errorf("Sum (%s) disagrees with ComputeAndWrap (%s)", got, 
streamed)
+       }
 }
diff --git a/pkg/fs/remote/gcp/gcs.go b/pkg/fs/remote/gcp/gcs.go
index ecdaf9120..234d05b3c 100644
--- a/pkg/fs/remote/gcp/gcs.go
+++ b/pkg/fs/remote/gcp/gcs.go
@@ -41,6 +41,12 @@ import (
 
 const checksumSha256Key = "checksum_sha256"
 
+// singleRequestChunkThreshold is the object size below which the GCS writer is
+// configured for a single-request (non-resumable) upload. Objects smaller than
+// one default chunk gain nothing from resumable chunking, and a single request
+// avoids the extra round-trip of establishing a resumable session.
+const singleRequestChunkThreshold = 16 << 20 // 16 MiB
+
 var _ remote.FS = (*gcsFS)(nil)
 
 // gcsFS implements remote.FS backed by Google Cloud Storage.
@@ -144,9 +150,21 @@ func (g *gcsFS) Upload(ctx context.Context, p string, data 
io.Reader) error {
 
        objPath := g.getFullPath(p)
        logger.Infof("GCS Upload: bucket=%s, path=%s, fullPath=%s", g.bucket, 
p, objPath)
+
+       // When the source is seekable (e.g. a local *os.File, as used by the 
backup
+       // tool), compute the checksum in a first pass, then upload it together 
with the
+       // object in a single request. This halves the per-object round-trips 
by removing
+       // the follow-up metadata Update call, which dominates backup time for 
the many
+       // tiny snapshot files.
+       if seeker, ok := data.(io.ReadSeeker); ok && 
os.Getenv("STORAGE_EMULATOR_HOST") == "" {
+               return g.uploadSeekable(ctx, objPath, seeker)
+       }
+
        wrappedReader, getHash := g.verifier.ComputeAndWrap(data)
 
-       w := g.client.Bucket(g.bucket).Object(objPath).NewWriter(ctx)
+       // Size is unknown for a non-seekable stream, so the default resumable 
upload
+       // is kept.
+       w := g.newWriter(ctx, objPath, -1)
        if _, err := io.Copy(w, wrappedReader); err != nil {
                _ = w.Close()
                return fmt.Errorf("failed to write object: %w", err)
@@ -176,6 +194,50 @@ func (g *gcsFS) Upload(ctx context.Context, p string, data 
io.Reader) error {
        return nil
 }
 
+// newWriter creates an object writer. When the object size is known and 
smaller
+// than one default chunk, it switches to a single-request (non-resumable) 
upload
+// to avoid the extra round-trip of establishing a resumable session. A 
negative
+// size means the size is unknown (e.g. a non-seekable stream), keeping the
+// default resumable behavior.
+func (g *gcsFS) newWriter(ctx context.Context, objPath string, size int64) 
*storage.Writer {
+       w := g.client.Bucket(g.bucket).Object(objPath).NewWriter(ctx)
+       if size >= 0 && size < singleRequestChunkThreshold {
+               w.ChunkSize = 0
+       }
+       return w
+}
+
+// uploadSeekable uploads a seekable source in a single request: it computes 
the
+// checksum in a first pass, rewinds, then writes the object with the checksum
+// already attached as metadata, avoiding the follow-up metadata Update call.
+func (g *gcsFS) uploadSeekable(ctx context.Context, objPath string, seeker 
io.ReadSeeker) error {
+       size, err := seeker.Seek(0, io.SeekEnd)
+       if err != nil {
+               return fmt.Errorf("failed to size object: %w", err)
+       }
+       if _, err = seeker.Seek(0, io.SeekStart); err != nil {
+               return fmt.Errorf("failed to rewind object: %w", err)
+       }
+       hash, err := g.verifier.Sum(seeker)
+       if err != nil {
+               return fmt.Errorf("failed to compute hash: %w", err)
+       }
+       if _, err = seeker.Seek(0, io.SeekStart); err != nil {
+               return fmt.Errorf("failed to rewind object: %w", err)
+       }
+
+       w := g.newWriter(ctx, objPath, size)
+       w.Metadata = map[string]string{checksumSha256Key: hash}
+       if _, err = io.Copy(w, seeker); err != nil {
+               _ = w.Close()
+               return fmt.Errorf("failed to write object: %w", err)
+       }
+       if err = w.Close(); err != nil {
+               return fmt.Errorf("failed to close writer: %w", err)
+       }
+       return nil
+}
+
 func (g *gcsFS) Download(ctx context.Context, p string) (io.ReadCloser, error) 
{
        if g.verifier == nil {
                return nil, fmt.Errorf("verifier not initialized")

Reply via email to