This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new f192c8e5 fix(manifest): handle lifecycle of the decoder in reader 
(#766)
f192c8e5 is described below

commit f192c8e504ebc7e7a3b66044d06be565efb2764e
Author: ferhat elmas <[email protected]>
AuthorDate: Mon Mar 2 23:25:59 2026 +0100

    fix(manifest): handle lifecycle of the decoder in reader (#766)
    
    related to #721
    
    * remove premature decoder close in the constructor so that reader can
    actually read the entries
    
    * add explicit close method for resource cleanup
    
    * call close in ReadManifest to prevent leak
    
    * add zstd codec based regression test
    
    Signed-off-by: ferhat elmas <[email protected]>
---
 manifest.go      | 12 +++++++++---
 manifest_test.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 65 insertions(+), 3 deletions(-)

diff --git a/manifest.go b/manifest.go
index d8c0e79e..3c135d1d 100644
--- a/manifest.go
+++ b/manifest.go
@@ -605,9 +605,6 @@ func NewManifestReader(file ManifestFile, in io.Reader) 
(*ManifestReader, error)
        if err != nil {
                return nil, err
        }
-       defer func() {
-               _ = dec.Close()
-       }()
 
        metadata := dec.Metadata()
        sc := dec.Schema()
@@ -669,6 +666,11 @@ func NewManifestReader(file ManifestFile, in io.Reader) 
(*ManifestReader, error)
        }, nil
 }
 
+// Close releases decoder resources associated with this manifest reader.
+func (c *ManifestReader) Close() error {
+       return c.dec.Close()
+}
+
 // Version returns the file's format version.
 func (c *ManifestReader) Version() int {
        return c.formatVersion
@@ -779,6 +781,10 @@ func ReadManifest(m ManifestFile, f io.Reader, 
discardDeleted bool) ([]ManifestE
        if err != nil {
                return nil, err
        }
+       defer func() {
+               _ = manifestReader.Close()
+       }()
+
        var results []ManifestEntry
        for {
                entry, err := manifestReader.ReadEntry()
diff --git a/manifest_test.go b/manifest_test.go
index b94b47ae..dfe3eb5a 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -1274,6 +1274,62 @@ func (m *ManifestTestSuite) TestManifestEntriesV3() {
        m.Zero(*datafile.SortOrderID())
 }
 
+func (m *ManifestTestSuite) TestNewManifestReaderZstdManifestEntriesV2() {
+       manifest := manifestFile{
+               version: 2,
+               SpecID:  1,
+               Path:    manifestFileRecordsV2[0].FilePath(),
+       }
+
+       partitionSpec := NewPartitionSpecID(1,
+               PartitionField{FieldID: 1000, SourceID: 1, Name: "VendorID", 
Transform: IdentityTransform{}},
+               PartitionField{FieldID: 1001, SourceID: 2, Name: 
"tpep_pickup_datetime", Transform: IdentityTransform{}})
+
+       partitionSchema, err := 
partitionTypeToAvroSchema(partitionSpec.PartitionType(testSchema))
+       m.Require().NoError(err)
+
+       entrySchema, err := internal.NewManifestEntrySchema(partitionSchema, 2)
+       m.Require().NoError(err)
+
+       mw := ManifestWriter{
+               version: 2,
+               spec:    partitionSpec,
+               schema:  testSchema,
+               content: ManifestContentData,
+       }
+       md, err := mw.meta()
+       m.Require().NoError(err)
+
+       var buf bytes.Buffer
+       enc, err := ocf.NewEncoderWithSchema(entrySchema, &buf,
+               ocf.WithSchemaMarshaler(ocf.FullSchemaMarshaler),
+               ocf.WithEncoderSchemaCache(&avro.SchemaCache{}),
+               ocf.WithMetadata(md),
+               ocf.WithCodec(ocf.ZStandard))
+       m.Require().NoError(err)
+
+       m.Require().NoError(enc.Encode(manifestEntryV2Records[0]))
+       m.Require().NoError(enc.Encode(manifestEntryV2Records[1]))
+       m.Require().NoError(enc.Close())
+
+       manifestReader, err := NewManifestReader(&manifest, 
bytes.NewReader(buf.Bytes()))
+       m.Require().NoError(err)
+       defer func() {
+               m.Require().NoError(manifestReader.Close())
+       }()
+
+       entry1, err := manifestReader.ReadEntry()
+       m.Require().NoError(err)
+       m.Equal(manifestEntryV2Records[0].DataFile().FilePath(), 
entry1.DataFile().FilePath())
+
+       entry2, err := manifestReader.ReadEntry()
+       m.Require().NoError(err)
+       m.Equal(manifestEntryV2Records[1].DataFile().FilePath(), 
entry2.DataFile().FilePath())
+
+       _, err = manifestReader.ReadEntry()
+       m.Require().ErrorIs(err, io.EOF)
+}
+
 func (m *ManifestTestSuite) TestManifestEntryBuilder() {
        dataFileBuilder, err := NewDataFileBuilder(
                NewPartitionSpec(),

Reply via email to