This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new f192c8e5 fix(manifest): handle lifecycle of the decoder in reader
(#766)
f192c8e5 is described below
commit f192c8e504ebc7e7a3b66044d06be565efb2764e
Author: ferhat elmas <[email protected]>
AuthorDate: Mon Mar 2 23:25:59 2026 +0100
fix(manifest): handle lifecycle of the decoder in reader (#766)
related to #721
* remove premature decoder close in the constructor so that reader can
actually read the entries
* add explicit close method for resource cleanup
* call close in ReadManifest to prevent leak
* add zstd codec based regression test
Signed-off-by: ferhat elmas <[email protected]>
---
manifest.go | 12 +++++++++---
manifest_test.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 65 insertions(+), 3 deletions(-)
diff --git a/manifest.go b/manifest.go
index d8c0e79e..3c135d1d 100644
--- a/manifest.go
+++ b/manifest.go
@@ -605,9 +605,6 @@ func NewManifestReader(file ManifestFile, in io.Reader)
(*ManifestReader, error)
if err != nil {
return nil, err
}
- defer func() {
- _ = dec.Close()
- }()
metadata := dec.Metadata()
sc := dec.Schema()
@@ -669,6 +666,11 @@ func NewManifestReader(file ManifestFile, in io.Reader)
(*ManifestReader, error)
}, nil
}
+// Close releases decoder resources associated with this manifest reader.
+func (c *ManifestReader) Close() error {
+ return c.dec.Close()
+}
+
// Version returns the file's format version.
func (c *ManifestReader) Version() int {
return c.formatVersion
@@ -779,6 +781,10 @@ func ReadManifest(m ManifestFile, f io.Reader,
discardDeleted bool) ([]ManifestE
if err != nil {
return nil, err
}
+ defer func() {
+ _ = manifestReader.Close()
+ }()
+
var results []ManifestEntry
for {
entry, err := manifestReader.ReadEntry()
diff --git a/manifest_test.go b/manifest_test.go
index b94b47ae..dfe3eb5a 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -1274,6 +1274,62 @@ func (m *ManifestTestSuite) TestManifestEntriesV3() {
m.Zero(*datafile.SortOrderID())
}
+func (m *ManifestTestSuite) TestNewManifestReaderZstdManifestEntriesV2() {
+ manifest := manifestFile{
+ version: 2,
+ SpecID: 1,
+ Path: manifestFileRecordsV2[0].FilePath(),
+ }
+
+ partitionSpec := NewPartitionSpecID(1,
+ PartitionField{FieldID: 1000, SourceID: 1, Name: "VendorID",
Transform: IdentityTransform{}},
+ PartitionField{FieldID: 1001, SourceID: 2, Name:
"tpep_pickup_datetime", Transform: IdentityTransform{}})
+
+ partitionSchema, err :=
partitionTypeToAvroSchema(partitionSpec.PartitionType(testSchema))
+ m.Require().NoError(err)
+
+ entrySchema, err := internal.NewManifestEntrySchema(partitionSchema, 2)
+ m.Require().NoError(err)
+
+ mw := ManifestWriter{
+ version: 2,
+ spec: partitionSpec,
+ schema: testSchema,
+ content: ManifestContentData,
+ }
+ md, err := mw.meta()
+ m.Require().NoError(err)
+
+ var buf bytes.Buffer
+ enc, err := ocf.NewEncoderWithSchema(entrySchema, &buf,
+ ocf.WithSchemaMarshaler(ocf.FullSchemaMarshaler),
+ ocf.WithEncoderSchemaCache(&avro.SchemaCache{}),
+ ocf.WithMetadata(md),
+ ocf.WithCodec(ocf.ZStandard))
+ m.Require().NoError(err)
+
+ m.Require().NoError(enc.Encode(manifestEntryV2Records[0]))
+ m.Require().NoError(enc.Encode(manifestEntryV2Records[1]))
+ m.Require().NoError(enc.Close())
+
+ manifestReader, err := NewManifestReader(&manifest,
bytes.NewReader(buf.Bytes()))
+ m.Require().NoError(err)
+ defer func() {
+ m.Require().NoError(manifestReader.Close())
+ }()
+
+ entry1, err := manifestReader.ReadEntry()
+ m.Require().NoError(err)
+ m.Equal(manifestEntryV2Records[0].DataFile().FilePath(),
entry1.DataFile().FilePath())
+
+ entry2, err := manifestReader.ReadEntry()
+ m.Require().NoError(err)
+ m.Equal(manifestEntryV2Records[1].DataFile().FilePath(),
entry2.DataFile().FilePath())
+
+ _, err = manifestReader.ReadEntry()
+ m.Require().ErrorIs(err, io.EOF)
+}
+
func (m *ManifestTestSuite) TestManifestEntryBuilder() {
dataFileBuilder, err := NewDataFileBuilder(
NewPartitionSpec(),