This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 7d90a8d5 fix: improve error handling on defer Close() (#583)
7d90a8d5 is described below
commit 7d90a8d530e9b75a31e3936a95360f4410036e87
Author: Tobias Pütz <[email protected]>
AuthorDate: Mon Oct 6 18:58:36 2025 +0200
fix: improve error handling on defer Close() (#583)
Previous to this change, failure to write metadata or data files can go
ignored since the error on `defer Close()` is unhandled. This PR:
- uses named return parameter and defer closure to propagate errors on
Close()
- handles other unhandled errors
---
catalog/internal/utils.go | 7 +++---
internal/mock_fs.go | 16 +++++++++++-
internal/utils.go | 7 ++++++
manifest.go | 4 +--
table/arrow_scanner.go | 7 +++---
table/arrow_utils.go | 1 +
table/internal/parquet_files.go | 5 ++--
table/internal/parquet_files_test.go | 47 ++++++++++++++++++++++++++++++++++++
table/snapshot_producers.go | 38 ++++++++++++++++++-----------
table/snapshots.go | 5 ++--
table/table.go | 4 +--
11 files changed, 112 insertions(+), 29 deletions(-)
diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go
index 493f32e9..2cbdc906 100644
--- a/catalog/internal/utils.go
+++ b/catalog/internal/utils.go
@@ -32,6 +32,7 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/internal"
"github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/google/uuid"
@@ -344,7 +345,7 @@ func CreateViewMetadata(
if err != nil {
return "", fmt.Errorf("failed to create view metadata file:
%w", err)
}
- defer out.Close()
+ defer internal.CheckedClose(out, &err)
if _, err := out.Write(viewMetadataBytes); err != nil {
return "", fmt.Errorf("failed to write view metadata: %w", err)
@@ -358,7 +359,7 @@ func LoadViewMetadata(ctx context.Context,
metadataLocation string,
viewName string,
namespace string,
-) (map[string]interface{}, error) {
+) (_ map[string]interface{}, err error) {
// Initial metadata with basic information
viewMetadata := map[string]interface{}{
"name": viewName,
@@ -377,7 +378,7 @@ func LoadViewMetadata(ctx context.Context,
if err != nil {
return viewMetadata, fmt.Errorf("error encountered loading view
metadata: %w", err)
}
- defer inputFile.Close()
+ defer internal.CheckedClose(inputFile, &err)
// Decode the complete metadata
var fullViewMetadata map[string]interface{}
diff --git a/internal/mock_fs.go b/internal/mock_fs.go
index 20b84f84..8e55fe08 100644
--- a/internal/mock_fs.go
+++ b/internal/mock_fs.go
@@ -20,6 +20,7 @@ package internal
import (
"bytes"
"errors"
+ sio "io"
"io/fs"
"github.com/apache/iceberg-go/io"
@@ -52,6 +53,7 @@ func (m *MockFS) Remove(name string) error {
type MockFSReadFile struct {
MockFS
+ ErrOnClose bool
}
func (m *MockFSReadFile) ReadFile(name string) ([]byte, error) {
@@ -61,7 +63,8 @@ func (m *MockFSReadFile) ReadFile(name string) ([]byte,
error) {
}
type MockFile struct {
- Contents *bytes.Reader
+ Contents *bytes.Reader
+ ErrOnClose bool
closed bool
}
@@ -74,7 +77,18 @@ func (m *MockFile) Read(p []byte) (int, error) {
return m.Contents.Read(p)
}
+func (m *MockFile) ReadFrom(r sio.Reader) (n int64, err error) {
+ return 0, nil
+}
+
+func (m *MockFile) Write(p []byte) (n int, err error) {
+ return len(p), nil
+}
+
func (m *MockFile) Close() error {
+ if m.ErrOnClose {
+ return errors.New("error on close")
+ }
if m.closed {
return errors.New("already closed")
}
diff --git a/internal/utils.go b/internal/utils.go
index 09d3b6a6..b8a62e53 100644
--- a/internal/utils.go
+++ b/internal/utils.go
@@ -19,6 +19,7 @@ package internal
import (
"cmp"
+ "errors"
"fmt"
"io"
"iter"
@@ -172,3 +173,9 @@ func Counter(start int) iter.Seq[int] {
}
}
}
+
+// CheckedClose is a helper function to close a resource and return an error
if it fails.
+// It is intended to be used in a defer statement.
+func CheckedClose(c io.Closer, err *error) {
+ *err = errors.Join(*err, c.Close())
+}
diff --git a/manifest.go b/manifest.go
index 087f4202..6e98d8b2 100644
--- a/manifest.go
+++ b/manifest.go
@@ -460,12 +460,12 @@ type hasFieldToIDMap interface {
setFieldIDToFixedSizeMap(map[int]int)
}
-func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool)
([]ManifestEntry, error) {
+func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) (_
[]ManifestEntry, err error) {
f, err := fs.Open(m.FilePath())
if err != nil {
return nil, err
}
- defer f.Close()
+ defer internal.CheckedClose(f, &err)
return ReadManifest(m, f, discardDeleted)
}
diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
index e94b6ebb..b41bea19 100644
--- a/table/arrow_scanner.go
+++ b/table/arrow_scanner.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/compute/exprs"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/iceberg-go"
+ iceinternal "github.com/apache/iceberg-go/internal"
iceio "github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table/internal"
"github.com/apache/iceberg-go/table/substrait"
@@ -98,7 +99,7 @@ func readAllDeleteFiles(ctx context.Context, fs iceio.IO,
tasks []FileScanTask,
return deletesPerFile, err
}
-func readDeletes(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile)
(map[string]*arrow.Chunked, error) {
+func readDeletes(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile)
(_ map[string]*arrow.Chunked, err error) {
src, err := internal.GetFile(ctx, fs, dataFile, true)
if err != nil {
return nil, err
@@ -108,7 +109,7 @@ func readDeletes(ctx context.Context, fs iceio.IO, dataFile
iceberg.DataFile) (m
if err != nil {
return nil, err
}
- defer rdr.Close()
+ defer iceinternal.CheckedClose(rdr, &err)
tbl, err := rdr.ReadTable(ctx)
if err != nil {
@@ -404,7 +405,7 @@ func (as *arrowScan) recordsFromTask(ctx context.Context,
task internal.Enumerat
if err != nil {
return err
}
- defer rdr.Close()
+ defer iceinternal.CheckedClose(rdr, &err)
pipeline := make([]recProcessFn, 0, 2)
if len(positionalDeletes) > 0 {
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 9fd1eb96..4cd254e9 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -1203,6 +1203,7 @@ func filesToDataFiles(ctx context.Context, fileIO
iceio.IO, meta *MetadataBuilde
for filePath := range paths {
format := tblutils.FormatFromFileName(filePath)
rdr := must(format.Open(ctx, fileIO, filePath))
+ // TODO: take a look at this defer Close() and consider
refactoring
defer rdr.Close()
arrSchema := must(rdr.Schema())
diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
index 6eac4619..7ed2f309 100644
--- a/table/internal/parquet_files.go
+++ b/table/internal/parquet_files.go
@@ -236,12 +236,13 @@ func (parquetFormat) GetWriteProperties(props
iceberg.Properties) any {
parquet.WithCompressionLevel(compressionLevel))
}
-func (p parquetFormat) WriteDataFile(ctx context.Context, fs
iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches
[]arrow.RecordBatch) (iceberg.DataFile, error) {
+func (p parquetFormat) WriteDataFile(ctx context.Context, fs
iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches
[]arrow.RecordBatch) (_ iceberg.DataFile, err error) {
fw, err := fs.Create(info.FileName)
if err != nil {
return nil, err
}
- defer fw.Close()
+
+ defer internal.CheckedClose(fw, &err)
cntWriter := internal.CountingWriter{W: fw}
mem := compute.GetAllocator(ctx)
diff --git a/table/internal/parquet_files_test.go
b/table/internal/parquet_files_test.go
index 19db8a72..47772cc8 100644
--- a/table/internal/parquet_files_test.go
+++ b/table/internal/parquet_files_test.go
@@ -19,11 +19,13 @@ package internal_test
import (
"bytes"
+ "context"
"math/big"
"strings"
"testing"
"time"
+ "github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/decimal128"
"github.com/apache/arrow-go/v18/arrow/memory"
@@ -32,6 +34,7 @@ import (
"github.com/apache/arrow-go/v18/parquet/metadata"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/apache/iceberg-go"
+ internal2 "github.com/apache/iceberg-go/internal"
"github.com/apache/iceberg-go/table"
"github.com/apache/iceberg-go/table/internal"
"github.com/google/uuid"
@@ -326,3 +329,47 @@ func TestMetricsPrimitiveTypes(t *testing.T) {
Scale: 2,
})
}
+
+func TestWriteDataFileErrOnClose(t *testing.T) {
+ ctx := context.Background()
+ fm := internal.GetFileFormat(iceberg.ParquetFile)
+ mockfs := internal2.MockFS{}
+ mockfs.Test(t)
+
+ mockfs.On("Create", "f").Return(&internal2.MockFile{
+ ErrOnClose: true,
+ }, nil)
+
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ schema := arrow.NewSchema([]arrow.Field{
+ {
+ Name: "nested",
+ Type: arrow.ListOfField(arrow.Field{
+ Name: "element", Type:
arrow.PrimitiveTypes.Int32, Nullable: false,
+ Metadata:
arrow.NewMetadata([]string{table.ArrowParquetFieldIDKey}, []string{"2"}),
+ }),
+ Metadata:
arrow.NewMetadata([]string{table.ArrowParquetFieldIDKey}, []string{"1"}),
+ },
+ }, nil)
+
+ bldr := array.NewRecordBuilder(mem, schema)
+ bldr.Field(0).AppendNull()
+ defer bldr.Release()
+
+ rec := bldr.NewRecordBatch()
+ defer rec.Release()
+
+ icesc, err := table.ArrowSchemaToIceberg(schema, false, nil)
+ require.NoError(t, err)
+
+ _, err = fm.WriteDataFile(ctx, &mockfs, nil, internal.WriteFileInfo{
+ FileSchema: icesc,
+ Spec: iceberg.PartitionSpec{},
+ FileName: "f",
+ StatsCols: nil,
+ WriteProps: []parquet.WriterProperty{},
+ }, []arrow.RecordBatch{rec})
+ require.ErrorContains(t, err, "error on close")
+}
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 1cf4ecd5..7352ab20 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -18,6 +18,7 @@
package table
import (
+ "errors"
"fmt"
"io"
"maps"
@@ -275,13 +276,19 @@ func (m *manifestMergeManager) createManifest(specID int,
bin []iceberg.Manifest
switch {
case entry.Status() == iceberg.EntryStatusDELETED &&
entry.SnapshotID() == m.snap.snapshotID:
// only files deleted by this snapshot should
be added to the new manifest
- wr.Delete(entry)
+ if err = wr.Delete(entry); err != nil {
+ return nil, err
+ }
case entry.Status() == iceberg.EntryStatusADDED &&
entry.SnapshotID() == m.snap.snapshotID:
// added entries from this snapshot are still
added, otherwise they should be existing
- wr.Add(entry)
+ if err = wr.Add(entry); err != nil {
+ return nil, err
+ }
case entry.Status() != iceberg.EntryStatusDELETED:
// add all non-deleted files from the old
manifest as existing files
- wr.Existing(entry)
+ if err = wr.Existing(entry); err != nil {
+ return nil, err
+ }
}
}
}
@@ -474,7 +481,7 @@ func (sp *snapshotProducer) deleteDataFile(df
iceberg.DataFile) *snapshotProduce
return sp
}
-func (sp *snapshotProducer) newManifestWriter(spec iceberg.PartitionSpec)
(*iceberg.ManifestWriter, string, *internal.CountingWriter, error) {
+func (sp *snapshotProducer) newManifestWriter(spec iceberg.PartitionSpec) (_
*iceberg.ManifestWriter, _ string, _ *internal.CountingWriter, err error) {
out, path, err := sp.newManifestOutput()
if err != nil {
return nil, "", nil, err
@@ -484,9 +491,7 @@ func (sp *snapshotProducer) newManifestWriter(spec
iceberg.PartitionSpec) (*iceb
wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion,
counter, spec,
sp.txn.meta.CurrentSchema(), sp.snapshotID)
if err != nil {
- defer out.Close()
-
- return nil, "", nil, err
+ return nil, "", nil, errors.Join(err, out.Close())
}
return wr, path, counter, nil
@@ -511,7 +516,7 @@ func (sp *snapshotProducer) fetchManifestEntry(m
iceberg.ManifestFile, discardDe
return m.FetchEntries(sp.io, discardDeleted)
}
-func (sp *snapshotProducer) manifests() ([]iceberg.ManifestFile, error) {
+func (sp *snapshotProducer) manifests() (_ []iceberg.ManifestFile, err error) {
var g errgroup.Group
results := [...][]iceberg.ManifestFile{nil, nil, nil}
@@ -522,7 +527,7 @@ func (sp *snapshotProducer) manifests()
([]iceberg.ManifestFile, error) {
if err != nil {
return err
}
- defer out.Close()
+ defer internal.CheckedClose(out, &err)
counter := &internal.CountingWriter{W: out}
currentSpec, err := sp.txn.meta.CurrentSpec()
@@ -578,7 +583,7 @@ func (sp *snapshotProducer) manifests()
([]iceberg.ManifestFile, error) {
if err != nil {
return err
}
- defer out.Close()
+ defer internal.CheckedClose(out, &err)
mf, err := iceberg.WriteManifest(path, out,
sp.txn.meta.formatVersion,
sp.spec(specid),
sp.txn.meta.CurrentSchema(), sp.snapshotID, entries)
@@ -623,13 +628,17 @@ func (sp *snapshotProducer) summary(props
iceberg.Properties) (Summary, error) {
return Summary{}, fmt.Errorf("could not get current partition
spec: %w", err)
}
for _, df := range sp.addedFiles {
- ssc.addFile(df, currentSchema, *partitionSpec)
+ if err = ssc.addFile(df, currentSchema, *partitionSpec); err !=
nil {
+ return Summary{}, err
+ }
}
if len(sp.deletedFiles) > 0 {
specs := sp.txn.meta.specs
for _, df := range sp.deletedFiles {
- ssc.removeFile(df, currentSchema, specs[df.SpecID()])
+ if err = ssc.removeFile(df, currentSchema,
specs[df.SpecID()]); err != nil {
+ return Summary{}, err
+ }
}
}
@@ -652,7 +661,7 @@ func (sp *snapshotProducer) summary(props
iceberg.Properties) (Summary, error) {
}, previousSummary)
}
-func (sp *snapshotProducer) commit() ([]Update, []Requirement, error) {
+func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) {
newManifests, err := sp.manifests()
if err != nil {
return nil, nil, err
@@ -681,7 +690,8 @@ func (sp *snapshotProducer) commit() ([]Update,
[]Requirement, error) {
if err != nil {
return nil, nil, err
}
- defer out.Close()
+ defer internal.CheckedClose(out, &err)
+
// TODO: Implement v3 here
err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out,
sp.snapshotID, parentSnapshot, &nextSequence, 0, newManifests)
diff --git a/table/snapshots.go b/table/snapshots.go
index 6485f62e..0bc2249c 100644
--- a/table/snapshots.go
+++ b/table/snapshots.go
@@ -28,6 +28,7 @@ import (
"strings"
"github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/internal"
iceio "github.com/apache/iceberg-go/io"
)
@@ -293,13 +294,13 @@ func (s Snapshot) Equals(other Snapshot) bool {
s.Summary.Equals(other.Summary)
}
-func (s Snapshot) Manifests(fio iceio.IO) ([]iceberg.ManifestFile, error) {
+func (s Snapshot) Manifests(fio iceio.IO) (_ []iceberg.ManifestFile, err
error) {
if s.ManifestList != "" {
f, err := fio.Open(s.ManifestList)
if err != nil {
return nil, fmt.Errorf("could not open manifest file:
%w", err)
}
- defer f.Close()
+ defer internal.CheckedClose(f, &err)
return iceberg.ReadManifestList(f)
}
diff --git a/table/table.go b/table/table.go
index 02240eed..04a0803b 100644
--- a/table/table.go
+++ b/table/table.go
@@ -390,7 +390,7 @@ func NewFromLocation(
metalocation string,
fsysF FSysF,
cat CatalogIO,
-) (*Table, error) {
+) (_ *Table, err error) {
var meta Metadata
fsys, err := fsysF(ctx)
@@ -411,7 +411,7 @@ func NewFromLocation(
if err != nil {
return nil, err
}
- defer f.Close()
+ defer internal.CheckedClose(f, &err)
if meta, err = ParseMetadata(f); err != nil {
return nil, err