This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new f1a08a23 fix(io): handle leak for error cases of open blob (#688)
f1a08a23 is described below
commit f1a08a235284202097d19fa07ffcfbe8f9889523
Author: ferhat elmas <[email protected]>
AuthorDate: Thu Jan 22 18:24:48 2026 +0100
fix(io): handle leak for error cases of open blob (#688)
related to #644, #681
Signed-off-by: ferhat elmas <[email protected]>
---
io/blob.go | 30 +++++++++---------
io/blob_test.go | 94 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 108 insertions(+), 16 deletions(-)
diff --git a/io/blob.go b/io/blob.go
index bff385fe..8fae7be5 100644
--- a/io/blob.go
+++ b/io/blob.go
@@ -42,26 +42,20 @@ type blobOpenFile struct {
ctx context.Context
}
-func (f *blobOpenFile) ReadAt(p []byte, off int64) (int, error) {
- rdr, err := f.b.Bucket.NewRangeReader(f.ctx, f.key, off, int64(len(p)),
nil)
- if err != nil {
- return 0, err
+func (f *blobOpenFile) ReadAt(p []byte, off int64) (n int, err error) {
+ var rdr io.ReadCloser
+ if f.b.newRangeReader != nil {
+ rdr, err = f.b.newRangeReader(f.ctx, f.key, off, int64(len(p)))
+ } else {
+ rdr, err = f.b.Bucket.NewRangeReader(f.ctx, f.key, off,
int64(len(p)), nil)
}
-
- // ensure the buffer is read, or EOF is reached for this read of this
"chunk"
- // given we are using offsets to read this block, it is constrained by
size of 'p'
- size, err := io.ReadFull(rdr, p)
if err != nil {
- if errors.Is(err, io.EOF) {
- return size, err
- }
- // check we are at the end of the underlying file
- if off+int64(size) > f.Size() {
- return size, err
- }
+ return 0, err
}
+ // not using internal.CheckedClose due to import cycle
+ defer func() { err = errors.Join(err, rdr.Close()) }()
- return size, rdr.Close()
+ return io.ReadFull(rdr, p)
}
// Functions to implement the `Stat()` function in the `io/fs.File` interface
@@ -99,6 +93,10 @@ type blobFileIO struct {
keyExtractor KeyExtractor
ctx context.Context
+
+ // newRangeReader is an optional hook for testing.
+ // It allows injecting a mock reader to verify Close calls.
+ newRangeReader func(ctx context.Context, key string, offset, length
int64) (io.ReadCloser, error)
}
func (bfs *blobFileIO) preprocess(path string) (string, error) {
diff --git a/io/blob_test.go b/io/blob_test.go
index 32b45553..fe614b67 100644
--- a/io/blob_test.go
+++ b/io/blob_test.go
@@ -18,9 +18,13 @@
package io
import (
+ "context"
+ "io"
"testing"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gocloud.dev/blob/memblob"
)
func TestDefaultKeyExtractor(t *testing.T) {
@@ -76,3 +80,93 @@ func TestDefaultKeyExtractor(t *testing.T) {
})
}
}
+
+type trackingReadCloser struct {
+ io.ReadCloser
+ closed *bool
+}
+
+func (t *trackingReadCloser) Close() error {
+ *t.closed = true
+
+ return t.ReadCloser.Close()
+}
+
+func TestReadAtResourceCleanup(t *testing.T) {
+ ctx := context.Background()
+
+ bucket := memblob.OpenBucket(nil)
+ defer bucket.Close()
+
+ content := []byte("short")
+ err := bucket.WriteAll(ctx, "test-file", content, nil)
+ require.NoError(t, err)
+
+ tests := []struct {
+ name string
+ offset int64
+ readLen int
+ wantN int
+ wantErr error
+ }{
+ {
+ name: "success full read",
+ offset: 0,
+ readLen: len(content),
+ wantN: len(content),
+ wantErr: nil,
+ },
+ {
+ name: "partial read /unexpected EOF",
+ offset: 2,
+ readLen: 2 * len(content),
+ wantN: len(content) - 2,
+ wantErr: io.ErrUnexpectedEOF,
+ },
+ {
+ name: "EOF read at end of file",
+ offset: int64(len(content)),
+ readLen: 2 * len(content),
+ wantN: 0,
+ wantErr: io.EOF,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ var lastReaderClosed bool
+ bfs := &blobFileIO{
+ Bucket: bucket,
+ keyExtractor: func(path string) (string, error)
{ return path, nil },
+ ctx: ctx,
+ newRangeReader: func(ctx context.Context, key
string, offset, length int64) (io.ReadCloser, error) {
+ r, err := bucket.NewRangeReader(ctx,
key, offset, length, nil)
+ if err != nil {
+ return nil, err
+ }
+ lastReaderClosed = false
+
+ return &trackingReadCloser{ReadCloser:
r, closed: &lastReaderClosed}, nil
+ },
+ }
+
+ file, err := bfs.Open("test-file")
+ require.NoError(t, err)
+ defer file.Close()
+
+ bof := file.(*blobOpenFile)
+
+ buf := make([]byte, tt.readLen)
+ n, err := bof.ReadAt(buf, tt.offset)
+
+ assert.Equal(t, tt.wantN, n, "read byte count mismatch")
+ if tt.wantErr == nil {
+ assert.NoError(t, err)
+ } else {
+ assert.ErrorIs(t, err, tt.wantErr)
+ }
+
+ assert.True(t, lastReaderClosed, "resource leak: range
reader was not closed")
+ })
+ }
+}