This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new f1a08a23 fix(io): handle leak for error cases of open blob (#688)
f1a08a23 is described below

commit f1a08a235284202097d19fa07ffcfbe8f9889523
Author: ferhat elmas <[email protected]>
AuthorDate: Thu Jan 22 18:24:48 2026 +0100

    fix(io): handle leak for error cases of open blob (#688)
    
    related to #644, #681
    
    Signed-off-by: ferhat elmas <[email protected]>
---
 io/blob.go      | 30 +++++++++---------
 io/blob_test.go | 94 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 108 insertions(+), 16 deletions(-)

diff --git a/io/blob.go b/io/blob.go
index bff385fe..8fae7be5 100644
--- a/io/blob.go
+++ b/io/blob.go
@@ -42,26 +42,20 @@ type blobOpenFile struct {
        ctx       context.Context
 }
 
-func (f *blobOpenFile) ReadAt(p []byte, off int64) (int, error) {
-       rdr, err := f.b.Bucket.NewRangeReader(f.ctx, f.key, off, int64(len(p)), 
nil)
-       if err != nil {
-               return 0, err
+func (f *blobOpenFile) ReadAt(p []byte, off int64) (n int, err error) {
+       var rdr io.ReadCloser
+       if f.b.newRangeReader != nil {
+               rdr, err = f.b.newRangeReader(f.ctx, f.key, off, int64(len(p)))
+       } else {
+               rdr, err = f.b.Bucket.NewRangeReader(f.ctx, f.key, off, 
int64(len(p)), nil)
        }
-
-       // ensure the buffer is read, or EOF is reached for this read of this 
"chunk"
-       // given we are using offsets to read this block, it is constrained by 
size of 'p'
-       size, err := io.ReadFull(rdr, p)
        if err != nil {
-               if errors.Is(err, io.EOF) {
-                       return size, err
-               }
-               // check we are at the end of the underlying file
-               if off+int64(size) > f.Size() {
-                       return size, err
-               }
+               return 0, err
        }
+       // not using internal.CheckedClose due to import cycle
+       defer func() { err = errors.Join(err, rdr.Close()) }()
 
-       return size, rdr.Close()
+       return io.ReadFull(rdr, p)
 }
 
 // Functions to implement the `Stat()` function in the `io/fs.File` interface
@@ -99,6 +93,10 @@ type blobFileIO struct {
 
        keyExtractor KeyExtractor
        ctx          context.Context
+
+       // newRangeReader is an optional hook for testing.
+       // It allows injecting a mock reader to verify Close calls.
+       newRangeReader func(ctx context.Context, key string, offset, length 
int64) (io.ReadCloser, error)
 }
 
 func (bfs *blobFileIO) preprocess(path string) (string, error) {
diff --git a/io/blob_test.go b/io/blob_test.go
index 32b45553..fe614b67 100644
--- a/io/blob_test.go
+++ b/io/blob_test.go
@@ -18,9 +18,13 @@
 package io
 
 import (
+       "context"
+       "io"
        "testing"
 
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "gocloud.dev/blob/memblob"
 )
 
 func TestDefaultKeyExtractor(t *testing.T) {
@@ -76,3 +80,93 @@ func TestDefaultKeyExtractor(t *testing.T) {
                })
        }
 }
+
+type trackingReadCloser struct {
+       io.ReadCloser
+       closed *bool
+}
+
+func (t *trackingReadCloser) Close() error {
+       *t.closed = true
+
+       return t.ReadCloser.Close()
+}
+
+func TestReadAtResourceCleanup(t *testing.T) {
+       ctx := context.Background()
+
+       bucket := memblob.OpenBucket(nil)
+       defer bucket.Close()
+
+       content := []byte("short")
+       err := bucket.WriteAll(ctx, "test-file", content, nil)
+       require.NoError(t, err)
+
+       tests := []struct {
+               name    string
+               offset  int64
+               readLen int
+               wantN   int
+               wantErr error
+       }{
+               {
+                       name:    "success full read",
+                       offset:  0,
+                       readLen: len(content),
+                       wantN:   len(content),
+                       wantErr: nil,
+               },
+               {
+                       name:    "partial read /unexpected EOF",
+                       offset:  2,
+                       readLen: 2 * len(content),
+                       wantN:   len(content) - 2,
+                       wantErr: io.ErrUnexpectedEOF,
+               },
+               {
+                       name:    "EOF read at end of file",
+                       offset:  int64(len(content)),
+                       readLen: 2 * len(content),
+                       wantN:   0,
+                       wantErr: io.EOF,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       var lastReaderClosed bool
+                       bfs := &blobFileIO{
+                               Bucket:       bucket,
+                               keyExtractor: func(path string) (string, error) 
{ return path, nil },
+                               ctx:          ctx,
+                               newRangeReader: func(ctx context.Context, key 
string, offset, length int64) (io.ReadCloser, error) {
+                                       r, err := bucket.NewRangeReader(ctx, 
key, offset, length, nil)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       lastReaderClosed = false
+
+                                       return &trackingReadCloser{ReadCloser: 
r, closed: &lastReaderClosed}, nil
+                               },
+                       }
+
+                       file, err := bfs.Open("test-file")
+                       require.NoError(t, err)
+                       defer file.Close()
+
+                       bof := file.(*blobOpenFile)
+
+                       buf := make([]byte, tt.readLen)
+                       n, err := bof.ReadAt(buf, tt.offset)
+
+                       assert.Equal(t, tt.wantN, n, "read byte count mismatch")
+                       if tt.wantErr == nil {
+                               assert.NoError(t, err)
+                       } else {
+                               assert.ErrorIs(t, err, tt.wantErr)
+                       }
+
+                       assert.True(t, lastReaderClosed, "resource leak: range 
reader was not closed")
+               })
+       }
+}

Reply via email to