This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d3320a1 fix(table): close writers on error for every exit path (#667)
1d3320a1 is described below

commit 1d3320a113a0f08999a470cf3e928888045829bd
Author: ferhat elmas <[email protected]>
AuthorDate: Thu Jan 8 17:20:31 2026 +0100

    fix(table): close writers on error for every exit path (#667)
    
    related to #644
    
    Signed-off-by: ferhat elmas <[email protected]>
---
 manifest.go                      |  17 +--
 manifest_test.go                 |  62 +++++++++
 table/snapshot_producers.go      |  24 ++--
 table/snapshot_producers_test.go | 286 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 367 insertions(+), 22 deletions(-)

diff --git a/manifest.go b/manifest.go
index a0d4c611..c02c79da 100644
--- a/manifest.go
+++ b/manifest.go
@@ -1442,11 +1442,8 @@ func (m *ManifestListWriter) AddManifests(files 
[]ManifestFile) error {
 }
 
 // WriteManifestList writes a list of manifest files to an avro file.
-func WriteManifestList(version int, out io.Writer, snapshotID int64, 
parentSnapshotID, sequenceNumber *int64, firstRowId int64, files 
[]ManifestFile) error {
-       var (
-               writer *ManifestListWriter
-               err    error
-       )
+func WriteManifestList(version int, out io.Writer, snapshotID int64, 
parentSnapshotID, sequenceNumber *int64, firstRowId int64, files 
[]ManifestFile) (err error) {
+       var writer *ManifestListWriter
 
        switch version {
        case 1:
@@ -1468,12 +1465,9 @@ func WriteManifestList(version int, out io.Writer, 
snapshotID int64, parentSnaps
        if err != nil {
                return err
        }
+       defer internal.CheckedClose(writer, &err)
 
-       if err = writer.AddManifests(files); err != nil {
-               return err
-       }
-
-       return writer.Close()
+       return writer.AddManifests(files)
 }
 
 func WriteManifest(
@@ -1484,13 +1478,14 @@ func WriteManifest(
        schema *Schema,
        snapshotID int64,
        entries []ManifestEntry,
-) (ManifestFile, error) {
+) (mf ManifestFile, err error) {
        cnt := &internal.CountingWriter{W: out}
 
        w, err := NewManifestWriter(version, cnt, spec, schema, snapshotID)
        if err != nil {
                return nil, err
        }
+       defer internal.CheckedClose(w, &err)
 
        for _, entry := range entries {
                if err := w.addEntry(entry.(*manifestEntry)); err != nil {
diff --git a/manifest_test.go b/manifest_test.go
index 2ad2edaa..1192474e 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -19,6 +19,7 @@ package iceberg
 
 import (
        "bytes"
+       "errors"
        "io"
        "testing"
        "time"
@@ -1447,3 +1448,64 @@ func (m *ManifestTestSuite) 
TestV3PrepareEntrySequenceNumberValidation() {
        m.Require().NoError(err)
        m.Equal(entry4, result)
 }
+
+var errLimitedWrite = errors.New("write limit exceeded")
+
+type limitedWriter struct {
+       limit   int
+       written int
+       err     error
+}
+
+func (w *limitedWriter) Write(p []byte) (int, error) {
+       if w.written+len(p) > w.limit {
+               return 0, w.err
+       }
+       w.written += len(p)
+
+       return len(p), nil
+}
+
+func (m *ManifestTestSuite) TestWriteManifestListClosesWriterOnError() {
+       seqNum := int64(7)
+       var header bytes.Buffer
+       writer, err := NewManifestListWriterV2(&header, snapshotID, seqNum, nil)
+       m.Require().NoError(err)
+       m.Require().NoError(writer.Close())
+
+       out := &limitedWriter{limit: header.Len(), err: errLimitedWrite}
+       err = WriteManifestList(2, out, snapshotID, nil, &seqNum, 0, 
[]ManifestFile{
+               manifestFileRecordsV2[0],
+               manifestFileRecordsV1[0],
+       })
+       m.Require().Error(err)
+       m.Require().ErrorContains(err, "ManifestListWriter only supports 
version 2 manifest files")
+       m.Require().ErrorIs(err, errLimitedWrite)
+}
+
+func (m *ManifestTestSuite) TestWriteManifestClosesWriterOnEntryError() {
+       partitionSpec := NewPartitionSpecID(1,
+               PartitionField{FieldID: 1000, SourceID: 1, Name: "VendorID", 
Transform: IdentityTransform{}},
+               PartitionField{FieldID: 1001, SourceID: 2, Name: 
"tpep_pickup_datetime", Transform: IdentityTransform{}})
+
+       var header bytes.Buffer
+       writer, err := NewManifestWriter(2, &header, partitionSpec, testSchema, 
entrySnapshotID)
+       m.Require().NoError(err)
+       headerLen := header.Len()
+
+       m.Require().NoError(writer.Add(manifestEntryV2Records[0]))
+       m.Require().NoError(writer.Close())
+
+       badEntry := *manifestEntryV2Records[1]
+       badEntry.EntryStatus = EntryStatusEXISTING
+       badEntry.SeqNum = nil
+
+       out := &limitedWriter{limit: headerLen, err: errLimitedWrite}
+       _, err = WriteManifest("test.avro", out, 2, partitionSpec, testSchema, 
entrySnapshotID, []ManifestEntry{
+               manifestEntryV2Records[0],
+               &badEntry,
+       })
+       m.Require().Error(err)
+       m.Require().ErrorContains(err, "only entries with status ADDED")
+       m.Require().ErrorIs(err, errLimitedWrite)
+}
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 7352ab20..fbb082fe 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -169,6 +169,8 @@ func (of *overwriteFiles) existingManifests() 
([]iceberg.ManifestFile, error) {
 
                for _, entry := range notDeleted {
                        if err := wr.Existing(entry); err != nil {
+                               internal.CheckedClose(wr, &err)
+
                                return existingFiles, err
                        }
                }
@@ -260,11 +262,12 @@ func (m *manifestMergeManager) groupBySpec(manifests 
[]iceberg.ManifestFile) map
        return groups
 }
 
-func (m *manifestMergeManager) createManifest(specID int, bin 
[]iceberg.ManifestFile) (iceberg.ManifestFile, error) {
+func (m *manifestMergeManager) createManifest(specID int, bin 
[]iceberg.ManifestFile) (mf iceberg.ManifestFile, err error) {
        wr, path, counter, err := m.snap.newManifestWriter(m.snap.spec(specID))
        if err != nil {
                return nil, err
        }
+       defer internal.CheckedClose(wr, &err)
 
        for _, manifest := range bin {
                entries, err := m.snap.fetchManifestEntry(manifest, false)
@@ -276,19 +279,17 @@ func (m *manifestMergeManager) createManifest(specID int, 
bin []iceberg.Manifest
                        switch {
                        case entry.Status() == iceberg.EntryStatusDELETED && 
entry.SnapshotID() == m.snap.snapshotID:
                                // only files deleted by this snapshot should 
be added to the new manifest
-                               if err = wr.Delete(entry); err != nil {
-                                       return nil, err
-                               }
+                               err = wr.Delete(entry)
                        case entry.Status() == iceberg.EntryStatusADDED && 
entry.SnapshotID() == m.snap.snapshotID:
                                // added entries from this snapshot are still 
added, otherwise they should be existing
-                               if err = wr.Add(entry); err != nil {
-                                       return nil, err
-                               }
+                               err = wr.Add(entry)
                        case entry.Status() != iceberg.EntryStatusDELETED:
                                // add all non-deleted files from the old 
manifest as existing files
-                               if err = wr.Existing(entry); err != nil {
-                                       return nil, err
-                               }
+                               err = wr.Existing(entry)
+                       }
+
+                       if err != nil {
+                               return nil, err
                        }
                }
        }
@@ -522,7 +523,7 @@ func (sp *snapshotProducer) manifests() (_ 
[]iceberg.ManifestFile, err error) {
        results := [...][]iceberg.ManifestFile{nil, nil, nil}
 
        if len(sp.addedFiles) > 0 {
-               g.Go(func() error {
+               g.Go(func() (err error) {
                        out, path, err := sp.newManifestOutput()
                        if err != nil {
                                return err
@@ -540,6 +541,7 @@ func (sp *snapshotProducer) manifests() (_ 
[]iceberg.ManifestFile, err error) {
                        if err != nil {
                                return err
                        }
+                       defer internal.CheckedClose(wr, &err)
 
                        for _, df := range sp.addedFiles {
                                err := 
wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID,
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
new file mode 100644
index 00000000..0866f2f4
--- /dev/null
+++ b/table/snapshot_producers_test.go
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "bytes"
+       "context"
+       "errors"
+       "io"
+       "io/fs"
+       "testing"
+       "time"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/internal"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/stretchr/testify/require"
+)
+
+var errLimitedWrite = errors.New("write limit exceeded")
+
+type limitedWriteCloser struct {
+       limit   int
+       written int
+       err     error
+}
+
+func (w *limitedWriteCloser) Write(p []byte) (int, error) {
+       if w.written+len(p) > w.limit {
+               return 0, w.err
+       }
+       w.written += len(p)
+
+       return len(p), nil
+}
+
+func (w *limitedWriteCloser) Close() error {
+       return nil
+}
+
+func (w *limitedWriteCloser) ReadFrom(r io.Reader) (int64, error) {
+       return io.Copy(w, r)
+}
+
+type memIO struct {
+       limit int
+       err   error
+       files map[string][]byte
+}
+
+func newMemIO(limit int, err error) *memIO {
+       return &memIO{
+               limit: limit,
+               err:   err,
+               files: make(map[string][]byte),
+       }
+}
+
+func (m *memIO) Open(name string) (iceio.File, error) {
+       data, ok := m.files[name]
+       if !ok {
+               return nil, fs.ErrNotExist
+       }
+
+       return &internal.MockFile{Contents: bytes.NewReader(data)}, nil
+}
+
+func (m *memIO) Create(name string) (iceio.FileWriter, error) {
+       return &limitedWriteCloser{limit: m.limit, err: m.err}, nil
+}
+
+func (m *memIO) WriteFile(name string, content []byte) error {
+       m.files[name] = append([]byte(nil), content...)
+
+       return nil
+}
+
+func (m *memIO) Remove(name string) error {
+       delete(m.files, name)
+
+       return nil
+}
+
+func manifestHeaderSize(t *testing.T, version int, spec iceberg.PartitionSpec, 
schema *iceberg.Schema) int {
+       t.Helper()
+
+       var buf bytes.Buffer
+       writer, err := iceberg.NewManifestWriter(version, &buf, spec, schema, 1)
+       require.NoError(t, err, "new manifest writer")
+       _ = writer.Close()
+
+       return buf.Len()
+}
+
+func manifestSize(t *testing.T, version int, spec iceberg.PartitionSpec, 
schema *iceberg.Schema, snapshotID int64, entries []iceberg.ManifestEntry) int {
+       t.Helper()
+
+       var buf bytes.Buffer
+       _, err := iceberg.WriteManifest("size.avro", &buf, version, spec, 
schema, snapshotID, entries)
+       require.NoError(t, err, "write manifest for size")
+
+       return buf.Len()
+}
+
+func newTestDataFile(t *testing.T, spec iceberg.PartitionSpec, path string, 
partition map[int]any) iceberg.DataFile {
+       t.Helper()
+
+       builder, err := iceberg.NewDataFileBuilder(
+               spec,
+               iceberg.EntryContentData,
+               path,
+               iceberg.ParquetFile,
+               partition,
+               nil,
+               nil,
+               1,
+               1,
+       )
+       require.NoError(t, err, "new data file builder")
+
+       return builder.Build()
+}
+
+func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) {
+       schema := iceberg.NewSchema(0, iceberg.NestedField{
+               ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, 
Required: true,
+       })
+       spec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+               SourceID: 1, FieldID: 1000, Name: "id", Transform: 
iceberg.IdentityTransform{},
+       })
+
+       meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, 
"table-location", nil)
+       require.NoError(t, err, "new metadata")
+       spec = meta.PartitionSpec()
+       schema = meta.CurrentSchema()
+       fieldID := 0
+       for field := range spec.Fields() {
+               fieldID = field.FieldID
+
+               break
+       }
+       require.NotZero(t, fieldID, "partition field id")
+
+       mem := newMemIO(manifestHeaderSize(t, 2, spec, schema), errLimitedWrite)
+       tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json", 
func(context.Context) (iceio.IO, error) {
+               return mem, nil
+       }, nil)
+       txn := tbl.NewTransaction()
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, mem, nil, nil)
+       validPartition := map[int]any{fieldID: int32(1)}
+       sp.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", 
validPartition))
+       sp.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", 
nil))
+
+       _, err = sp.manifests()
+       require.ErrorIs(t, err, errLimitedWrite)
+}
+
+func TestManifestMergeManagerClosesWriterOnError(t *testing.T) {
+       schema := iceberg.NewSchema(0, iceberg.NestedField{
+               ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, 
Required: true,
+       })
+       spec := iceberg.NewPartitionSpec()
+
+       mem := newMemIO(manifestHeaderSize(t, 2, spec, schema), errLimitedWrite)
+       meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, 
"table-location", nil)
+       require.NoError(t, err, "new metadata")
+
+       tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json", 
func(context.Context) (iceio.IO, error) {
+               return mem, nil
+       }, nil)
+       txn := tbl.NewTransaction()
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, mem, nil, nil)
+       df := newTestDataFile(t, spec, "file://data-1.parquet", nil)
+       entries := []iceberg.ManifestEntry{
+               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, 
&sp.snapshotID, nil, nil, df),
+       }
+
+       manifestPath := "table-location/metadata/manifest-1.avro"
+       var manifestBuf bytes.Buffer
+       manifestFile, err := iceberg.WriteManifest(manifestPath, &manifestBuf, 
2, spec, schema, sp.snapshotID, entries)
+       require.NoError(t, err, "write manifest")
+       require.NoError(t, mem.WriteFile(manifestPath, manifestBuf.Bytes()))
+
+       missingManifest := iceberg.NewManifestFile(2, 
"table-location/metadata/missing.avro", 1, int32(spec.ID()), sp.snapshotID).
+               Build()
+
+       mgr := manifestMergeManager{snap: sp}
+       _, err = mgr.createManifest(spec.ID(), []iceberg.ManifestFile{
+               manifestFile,
+               missingManifest,
+       })
+       require.ErrorIs(t, err, errLimitedWrite)
+}
+
+func TestOverwriteFilesExistingManifestsClosesWriterOnError(t *testing.T) {
+       schema := iceberg.NewSchema(0, iceberg.NestedField{
+               ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, 
Required: true,
+       })
+       spec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+               SourceID: 1, FieldID: 1000, Name: "id", Transform: 
iceberg.IdentityTransform{},
+       })
+
+       meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, 
"table-location", nil)
+       require.NoError(t, err, "new metadata")
+
+       spec = meta.PartitionSpec()
+       schema = meta.CurrentSchema()
+       fieldID := 0
+       for field := range spec.Fields() {
+               fieldID = field.FieldID
+
+               break
+       }
+       require.NotZero(t, fieldID, "partition field id")
+
+       snapshotID := int64(100)
+       seqNum := int64(-1)
+       validSeq := int64(42)
+       manifestPath := "table-location/metadata/manifest-1.avro"
+       manifestListPath := "table-location/metadata/snap-1.avro"
+
+       validPartition := map[int]any{fieldID: int32(1)}
+       validFile := newTestDataFile(t, spec, "file://valid.parquet", 
validPartition)
+       sizeEntries := []iceberg.ManifestEntry{
+               iceberg.NewManifestEntry(iceberg.EntryStatusEXISTING, 
&snapshotID, &validSeq, nil, validFile),
+       }
+       headerLen := manifestHeaderSize(t, 2, spec, schema)
+       manifestLen := manifestSize(t, 2, spec, schema, snapshotID, sizeEntries)
+       require.Greater(t, manifestLen, headerLen, "manifest size")
+
+       mem := newMemIO(manifestLen-1, errLimitedWrite)
+       tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json", 
func(context.Context) (iceio.IO, error) {
+               return mem, nil
+       }, nil)
+       txn := tbl.NewTransaction()
+
+       deletedFile := newTestDataFile(t, spec, "file://deleted.parquet", 
validPartition)
+       invalidFile := newTestDataFile(t, spec, "file://invalid.parquet", 
validPartition)
+       entries := []iceberg.ManifestEntry{
+               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, 
&validSeq, nil, deletedFile),
+               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, 
&validSeq, nil, validFile),
+               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, 
nil, nil, invalidFile),
+       }
+
+       var manifestBuf bytes.Buffer
+       manifestFile, err := iceberg.WriteManifest(manifestPath, &manifestBuf, 
2, spec, schema, snapshotID, entries)
+       require.NoError(t, err, "write manifest")
+       require.NoError(t, mem.WriteFile(manifestPath, manifestBuf.Bytes()))
+
+       var listBuf bytes.Buffer
+       err = iceberg.WriteManifestList(2, &listBuf, snapshotID, nil, &seqNum, 
0, []iceberg.ManifestFile{manifestFile})
+       require.NoError(t, err, "write manifest list")
+       require.NoError(t, mem.WriteFile(manifestListPath, listBuf.Bytes()))
+
+       snap := Snapshot{
+               SnapshotID:     snapshotID,
+               SequenceNumber: seqNum,
+               TimestampMs:    time.Now().UnixMilli(),
+               ManifestList:   manifestListPath,
+       }
+       txn.meta.snapshotList = []Snapshot{snap}
+       txn.meta.currentSnapshotID = &snapshotID
+
+       sp := newOverwriteFilesProducer(OpOverwrite, txn, mem, nil, nil)
+       sp.deleteDataFile(deletedFile)
+
+       _, err = sp.existingManifests()
+       require.ErrorIs(t, err, errLimitedWrite)
+}

Reply via email to