This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d90a8d5 fix: improve error handling on defer Close() (#583)
7d90a8d5 is described below

commit 7d90a8d530e9b75a31e3936a95360f4410036e87
Author: Tobias Pütz <[email protected]>
AuthorDate: Mon Oct 6 18:58:36 2025 +0200

    fix: improve error handling on defer Close() (#583)
    
    Previous to this change, failure to write metadata or data files can go
    ignored since the error on `defer Close()` is unhandled. This PR:
    
    - uses named return parameter and defer closure to propagate errors on
    Close()
    - handles other unhandled errors
---
 catalog/internal/utils.go            |  7 +++---
 internal/mock_fs.go                  | 16 +++++++++++-
 internal/utils.go                    |  7 ++++++
 manifest.go                          |  4 +--
 table/arrow_scanner.go               |  7 +++---
 table/arrow_utils.go                 |  1 +
 table/internal/parquet_files.go      |  5 ++--
 table/internal/parquet_files_test.go | 47 ++++++++++++++++++++++++++++++++++++
 table/snapshot_producers.go          | 38 ++++++++++++++++++-----------
 table/snapshots.go                   |  5 ++--
 table/table.go                       |  4 +--
 11 files changed, 112 insertions(+), 29 deletions(-)

diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go
index 493f32e9..2cbdc906 100644
--- a/catalog/internal/utils.go
+++ b/catalog/internal/utils.go
@@ -32,6 +32,7 @@ import (
 
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/internal"
        "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/table"
        "github.com/google/uuid"
@@ -344,7 +345,7 @@ func CreateViewMetadata(
        if err != nil {
                return "", fmt.Errorf("failed to create view metadata file: 
%w", err)
        }
-       defer out.Close()
+       defer internal.CheckedClose(out, &err)
 
        if _, err := out.Write(viewMetadataBytes); err != nil {
                return "", fmt.Errorf("failed to write view metadata: %w", err)
@@ -358,7 +359,7 @@ func LoadViewMetadata(ctx context.Context,
        metadataLocation string,
        viewName string,
        namespace string,
-) (map[string]interface{}, error) {
+) (_ map[string]interface{}, err error) {
        // Initial metadata with basic information
        viewMetadata := map[string]interface{}{
                "name":              viewName,
@@ -377,7 +378,7 @@ func LoadViewMetadata(ctx context.Context,
        if err != nil {
                return viewMetadata, fmt.Errorf("error encountered loading view 
metadata: %w", err)
        }
-       defer inputFile.Close()
+       defer internal.CheckedClose(inputFile, &err)
 
        // Decode the complete metadata
        var fullViewMetadata map[string]interface{}
diff --git a/internal/mock_fs.go b/internal/mock_fs.go
index 20b84f84..8e55fe08 100644
--- a/internal/mock_fs.go
+++ b/internal/mock_fs.go
@@ -20,6 +20,7 @@ package internal
 import (
        "bytes"
        "errors"
+       sio "io"
        "io/fs"
 
        "github.com/apache/iceberg-go/io"
@@ -52,6 +53,7 @@ func (m *MockFS) Remove(name string) error {
 
 type MockFSReadFile struct {
        MockFS
+       ErrOnClose bool
 }
 
 func (m *MockFSReadFile) ReadFile(name string) ([]byte, error) {
@@ -61,7 +63,8 @@ func (m *MockFSReadFile) ReadFile(name string) ([]byte, 
error) {
 }
 
 type MockFile struct {
-       Contents *bytes.Reader
+       Contents   *bytes.Reader
+       ErrOnClose bool
 
        closed bool
 }
@@ -74,7 +77,18 @@ func (m *MockFile) Read(p []byte) (int, error) {
        return m.Contents.Read(p)
 }
 
+func (m *MockFile) ReadFrom(r sio.Reader) (n int64, err error) {
+       return 0, nil
+}
+
+func (m *MockFile) Write(p []byte) (n int, err error) {
+       return len(p), nil
+}
+
 func (m *MockFile) Close() error {
+       if m.ErrOnClose {
+               return errors.New("error on close")
+       }
        if m.closed {
                return errors.New("already closed")
        }
diff --git a/internal/utils.go b/internal/utils.go
index 09d3b6a6..b8a62e53 100644
--- a/internal/utils.go
+++ b/internal/utils.go
@@ -19,6 +19,7 @@ package internal
 
 import (
        "cmp"
+       "errors"
        "fmt"
        "io"
        "iter"
@@ -172,3 +173,9 @@ func Counter(start int) iter.Seq[int] {
                }
        }
 }
+
+// CheckedClose is a helper function to close a resource and return an error 
if it fails.
+// It is intended to be used in a defer statement.
+func CheckedClose(c io.Closer, err *error) {
+       *err = errors.Join(*err, c.Close())
+}
diff --git a/manifest.go b/manifest.go
index 087f4202..6e98d8b2 100644
--- a/manifest.go
+++ b/manifest.go
@@ -460,12 +460,12 @@ type hasFieldToIDMap interface {
        setFieldIDToFixedSizeMap(map[int]int)
 }
 
-func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) 
([]ManifestEntry, error) {
+func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) (_ 
[]ManifestEntry, err error) {
        f, err := fs.Open(m.FilePath())
        if err != nil {
                return nil, err
        }
-       defer f.Close()
+       defer internal.CheckedClose(f, &err)
 
        return ReadManifest(m, f, discardDeleted)
 }
diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
index e94b6ebb..b41bea19 100644
--- a/table/arrow_scanner.go
+++ b/table/arrow_scanner.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/arrow-go/v18/arrow/compute/exprs"
        "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/apache/iceberg-go"
+       iceinternal "github.com/apache/iceberg-go/internal"
        iceio "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/table/internal"
        "github.com/apache/iceberg-go/table/substrait"
@@ -98,7 +99,7 @@ func readAllDeleteFiles(ctx context.Context, fs iceio.IO, 
tasks []FileScanTask,
        return deletesPerFile, err
 }
 
-func readDeletes(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile) 
(map[string]*arrow.Chunked, error) {
+func readDeletes(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile) 
(_ map[string]*arrow.Chunked, err error) {
        src, err := internal.GetFile(ctx, fs, dataFile, true)
        if err != nil {
                return nil, err
@@ -108,7 +109,7 @@ func readDeletes(ctx context.Context, fs iceio.IO, dataFile 
iceberg.DataFile) (m
        if err != nil {
                return nil, err
        }
-       defer rdr.Close()
+       defer iceinternal.CheckedClose(rdr, &err)
 
        tbl, err := rdr.ReadTable(ctx)
        if err != nil {
@@ -404,7 +405,7 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, 
task internal.Enumerat
        if err != nil {
                return err
        }
-       defer rdr.Close()
+       defer iceinternal.CheckedClose(rdr, &err)
 
        pipeline := make([]recProcessFn, 0, 2)
        if len(positionalDeletes) > 0 {
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 9fd1eb96..4cd254e9 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -1203,6 +1203,7 @@ func filesToDataFiles(ctx context.Context, fileIO 
iceio.IO, meta *MetadataBuilde
                for filePath := range paths {
                        format := tblutils.FormatFromFileName(filePath)
                        rdr := must(format.Open(ctx, fileIO, filePath))
+                       // TODO: take a look at this defer Close() and consider 
refactoring
                        defer rdr.Close()
 
                        arrSchema := must(rdr.Schema())
diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
index 6eac4619..7ed2f309 100644
--- a/table/internal/parquet_files.go
+++ b/table/internal/parquet_files.go
@@ -236,12 +236,13 @@ func (parquetFormat) GetWriteProperties(props 
iceberg.Properties) any {
                parquet.WithCompressionLevel(compressionLevel))
 }
 
-func (p parquetFormat) WriteDataFile(ctx context.Context, fs 
iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches 
[]arrow.RecordBatch) (iceberg.DataFile, error) {
+func (p parquetFormat) WriteDataFile(ctx context.Context, fs 
iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches 
[]arrow.RecordBatch) (_ iceberg.DataFile, err error) {
        fw, err := fs.Create(info.FileName)
        if err != nil {
                return nil, err
        }
-       defer fw.Close()
+
+       defer internal.CheckedClose(fw, &err)
 
        cntWriter := internal.CountingWriter{W: fw}
        mem := compute.GetAllocator(ctx)
diff --git a/table/internal/parquet_files_test.go 
b/table/internal/parquet_files_test.go
index 19db8a72..47772cc8 100644
--- a/table/internal/parquet_files_test.go
+++ b/table/internal/parquet_files_test.go
@@ -19,11 +19,13 @@ package internal_test
 
 import (
        "bytes"
+       "context"
        "math/big"
        "strings"
        "testing"
        "time"
 
+       "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/decimal128"
        "github.com/apache/arrow-go/v18/arrow/memory"
@@ -32,6 +34,7 @@ import (
        "github.com/apache/arrow-go/v18/parquet/metadata"
        "github.com/apache/arrow-go/v18/parquet/pqarrow"
        "github.com/apache/iceberg-go"
+       internal2 "github.com/apache/iceberg-go/internal"
        "github.com/apache/iceberg-go/table"
        "github.com/apache/iceberg-go/table/internal"
        "github.com/google/uuid"
@@ -326,3 +329,47 @@ func TestMetricsPrimitiveTypes(t *testing.T) {
                Scale: 2,
        })
 }
+
+func TestWriteDataFileErrOnClose(t *testing.T) {
+       ctx := context.Background()
+       fm := internal.GetFileFormat(iceberg.ParquetFile)
+       mockfs := internal2.MockFS{}
+       mockfs.Test(t)
+
+       mockfs.On("Create", "f").Return(&internal2.MockFile{
+               ErrOnClose: true,
+       }, nil)
+
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       schema := arrow.NewSchema([]arrow.Field{
+               {
+                       Name: "nested",
+                       Type: arrow.ListOfField(arrow.Field{
+                               Name: "element", Type: 
arrow.PrimitiveTypes.Int32, Nullable: false,
+                               Metadata: 
arrow.NewMetadata([]string{table.ArrowParquetFieldIDKey}, []string{"2"}),
+                       }),
+                       Metadata: 
arrow.NewMetadata([]string{table.ArrowParquetFieldIDKey}, []string{"1"}),
+               },
+       }, nil)
+
+       bldr := array.NewRecordBuilder(mem, schema)
+       bldr.Field(0).AppendNull()
+       defer bldr.Release()
+
+       rec := bldr.NewRecordBatch()
+       defer rec.Release()
+
+       icesc, err := table.ArrowSchemaToIceberg(schema, false, nil)
+       require.NoError(t, err)
+
+       _, err = fm.WriteDataFile(ctx, &mockfs, nil, internal.WriteFileInfo{
+               FileSchema: icesc,
+               Spec:       iceberg.PartitionSpec{},
+               FileName:   "f",
+               StatsCols:  nil,
+               WriteProps: []parquet.WriterProperty{},
+       }, []arrow.RecordBatch{rec})
+       require.ErrorContains(t, err, "error on close")
+}
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 1cf4ecd5..7352ab20 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -18,6 +18,7 @@
 package table
 
 import (
+       "errors"
        "fmt"
        "io"
        "maps"
@@ -275,13 +276,19 @@ func (m *manifestMergeManager) createManifest(specID int, 
bin []iceberg.Manifest
                        switch {
                        case entry.Status() == iceberg.EntryStatusDELETED && 
entry.SnapshotID() == m.snap.snapshotID:
                                // only files deleted by this snapshot should 
be added to the new manifest
-                               wr.Delete(entry)
+                               if err = wr.Delete(entry); err != nil {
+                                       return nil, err
+                               }
                        case entry.Status() == iceberg.EntryStatusADDED && 
entry.SnapshotID() == m.snap.snapshotID:
                                // added entries from this snapshot are still 
added, otherwise they should be existing
-                               wr.Add(entry)
+                               if err = wr.Add(entry); err != nil {
+                                       return nil, err
+                               }
                        case entry.Status() != iceberg.EntryStatusDELETED:
                                // add all non-deleted files from the old 
manifest as existing files
-                               wr.Existing(entry)
+                               if err = wr.Existing(entry); err != nil {
+                                       return nil, err
+                               }
                        }
                }
        }
@@ -474,7 +481,7 @@ func (sp *snapshotProducer) deleteDataFile(df 
iceberg.DataFile) *snapshotProduce
        return sp
 }
 
-func (sp *snapshotProducer) newManifestWriter(spec iceberg.PartitionSpec) 
(*iceberg.ManifestWriter, string, *internal.CountingWriter, error) {
+func (sp *snapshotProducer) newManifestWriter(spec iceberg.PartitionSpec) (_ 
*iceberg.ManifestWriter, _ string, _ *internal.CountingWriter, err error) {
        out, path, err := sp.newManifestOutput()
        if err != nil {
                return nil, "", nil, err
@@ -484,9 +491,7 @@ func (sp *snapshotProducer) newManifestWriter(spec 
iceberg.PartitionSpec) (*iceb
        wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion, 
counter, spec,
                sp.txn.meta.CurrentSchema(), sp.snapshotID)
        if err != nil {
-               defer out.Close()
-
-               return nil, "", nil, err
+               return nil, "", nil, errors.Join(err, out.Close())
        }
 
        return wr, path, counter, nil
@@ -511,7 +516,7 @@ func (sp *snapshotProducer) fetchManifestEntry(m 
iceberg.ManifestFile, discardDe
        return m.FetchEntries(sp.io, discardDeleted)
 }
 
-func (sp *snapshotProducer) manifests() ([]iceberg.ManifestFile, error) {
+func (sp *snapshotProducer) manifests() (_ []iceberg.ManifestFile, err error) {
        var g errgroup.Group
 
        results := [...][]iceberg.ManifestFile{nil, nil, nil}
@@ -522,7 +527,7 @@ func (sp *snapshotProducer) manifests() 
([]iceberg.ManifestFile, error) {
                        if err != nil {
                                return err
                        }
-                       defer out.Close()
+                       defer internal.CheckedClose(out, &err)
 
                        counter := &internal.CountingWriter{W: out}
                        currentSpec, err := sp.txn.meta.CurrentSpec()
@@ -578,7 +583,7 @@ func (sp *snapshotProducer) manifests() 
([]iceberg.ManifestFile, error) {
                                if err != nil {
                                        return err
                                }
-                               defer out.Close()
+                               defer internal.CheckedClose(out, &err)
 
                                mf, err := iceberg.WriteManifest(path, out, 
sp.txn.meta.formatVersion,
                                        sp.spec(specid), 
sp.txn.meta.CurrentSchema(), sp.snapshotID, entries)
@@ -623,13 +628,17 @@ func (sp *snapshotProducer) summary(props 
iceberg.Properties) (Summary, error) {
                return Summary{}, fmt.Errorf("could not get current partition 
spec: %w", err)
        }
        for _, df := range sp.addedFiles {
-               ssc.addFile(df, currentSchema, *partitionSpec)
+               if err = ssc.addFile(df, currentSchema, *partitionSpec); err != 
nil {
+                       return Summary{}, err
+               }
        }
 
        if len(sp.deletedFiles) > 0 {
                specs := sp.txn.meta.specs
                for _, df := range sp.deletedFiles {
-                       ssc.removeFile(df, currentSchema, specs[df.SpecID()])
+                       if err = ssc.removeFile(df, currentSchema, 
specs[df.SpecID()]); err != nil {
+                               return Summary{}, err
+                       }
                }
        }
 
@@ -652,7 +661,7 @@ func (sp *snapshotProducer) summary(props 
iceberg.Properties) (Summary, error) {
        }, previousSummary)
 }
 
-func (sp *snapshotProducer) commit() ([]Update, []Requirement, error) {
+func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) {
        newManifests, err := sp.manifests()
        if err != nil {
                return nil, nil, err
@@ -681,7 +690,8 @@ func (sp *snapshotProducer) commit() ([]Update, 
[]Requirement, error) {
        if err != nil {
                return nil, nil, err
        }
-       defer out.Close()
+       defer internal.CheckedClose(out, &err)
+
        // TODO: Implement v3 here
        err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out,
                sp.snapshotID, parentSnapshot, &nextSequence, 0, newManifests)
diff --git a/table/snapshots.go b/table/snapshots.go
index 6485f62e..0bc2249c 100644
--- a/table/snapshots.go
+++ b/table/snapshots.go
@@ -28,6 +28,7 @@ import (
        "strings"
 
        "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/internal"
        iceio "github.com/apache/iceberg-go/io"
 )
 
@@ -293,13 +294,13 @@ func (s Snapshot) Equals(other Snapshot) bool {
                s.Summary.Equals(other.Summary)
 }
 
-func (s Snapshot) Manifests(fio iceio.IO) ([]iceberg.ManifestFile, error) {
+func (s Snapshot) Manifests(fio iceio.IO) (_ []iceberg.ManifestFile, err 
error) {
        if s.ManifestList != "" {
                f, err := fio.Open(s.ManifestList)
                if err != nil {
                        return nil, fmt.Errorf("could not open manifest file: 
%w", err)
                }
-               defer f.Close()
+               defer internal.CheckedClose(f, &err)
 
                return iceberg.ReadManifestList(f)
        }
diff --git a/table/table.go b/table/table.go
index 02240eed..04a0803b 100644
--- a/table/table.go
+++ b/table/table.go
@@ -390,7 +390,7 @@ func NewFromLocation(
        metalocation string,
        fsysF FSysF,
        cat CatalogIO,
-) (*Table, error) {
+) (_ *Table, err error) {
        var meta Metadata
 
        fsys, err := fsysF(ctx)
@@ -411,7 +411,7 @@ func NewFromLocation(
                if err != nil {
                        return nil, err
                }
-               defer f.Close()
+               defer internal.CheckedClose(f, &err)
 
                if meta, err = ParseMetadata(f); err != nil {
                        return nil, err

Reply via email to