This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 19bec10a feat: puffin Reader and Writer (#676)
19bec10a is described below
commit 19bec10ab3a697e604000d0c52cf58c5d6f8c6b7
Author: Shreyas Mishra <[email protected]>
AuthorDate: Tue Feb 24 21:55:18 2026 +0530
feat: puffin Reader and Writer (#676)
for #589
- Added PuffinWriter
- Added PuffinReader
- uncompressed only
---------
Signed-off-by: Shreyas220 <[email protected]>
---
dev/release/rat_exclude_files.txt | 6 +-
puffin/puffin.go | 81 +++
puffin/puffin_reader.go | 375 ++++++++++++++
puffin/puffin_test.go | 557 +++++++++++++++++++++
puffin/puffin_writer.go | 238 +++++++++
puffin/testdata/README.md | 21 +
puffin/testdata/empty-puffin-uncompressed.bin | Bin 0 -> 32 bytes
.../sample-metric-data-compressed-zstd.bin | Bin 0 -> 417 bytes
.../testdata/sample-metric-data-uncompressed.bin | Bin 0 -> 355 bytes
9 files changed, 1277 insertions(+), 1 deletion(-)
diff --git a/dev/release/rat_exclude_files.txt
b/dev/release/rat_exclude_files.txt
index 08ab050c..c9ff5498 100644
--- a/dev/release/rat_exclude_files.txt
+++ b/dev/release/rat_exclude_files.txt
@@ -21,4 +21,8 @@ NOTICE
go.sum
build
rat-results.txt
-operation_string.go
\ No newline at end of file
+operation_string.go
+empty-puffin-uncompressed.bin
+sample-metric-data-uncompressed.bin
+sample-metric-data-compressed-zstd.bin
+puffin/testdata/*
\ No newline at end of file
diff --git a/puffin/puffin.go b/puffin/puffin.go
new file mode 100644
index 00000000..85e8fbe1
--- /dev/null
+++ b/puffin/puffin.go
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Package puffin provides reading and writing of Puffin files.
+//
+// Puffin is a file format designed to store statistics and indexes
+// for Iceberg tables. A Puffin file contains blobs (opaque byte sequences)
+// with associated metadata, such as Apache DataSketches or deletion vectors.
+//
+// File structure:
+//
+// [Magic] [Blob]* [Magic] [Footer Payload] [Footer Payload Size] [Flags]
[Magic]
+//
+// See the specification at https://iceberg.apache.org/puffin-spec/
+package puffin
+
+var magic = [4]byte{'P', 'F', 'A', '1'}
+
+const (
+ //[Magic] [FooterPayload] [FooterPayloadSize] [Flags] [Magic]
+ // MagicSize is the number of bytes in the magic marker.
+ MagicSize = 4
+
+ // footerTrailerSize accounts for footer length (4)+ flags (4) +
trailing magic (4).
+ footerTrailerSize = 12
+
+ // FooterFlagCompressed indicates a compressed footer; unsupported in
this implementation.
+ FooterFlagCompressed = 1 // bit 0
+
+ // Prevents OOM
+ // DefaultMaxBlobSize is the maximum blob size allowed when reading
(256 MB).
+ // Override with WithMaxBlobSize when creating a reader.
+ DefaultMaxBlobSize = 256 << 20
+
+ // CreatedBy is a human-readable identification of the application
writing the file, along with its version.
+ // Example: "Trino version 381".
+ CreatedBy = "created-by"
+)
+
+type BlobMetadata struct {
+ Type BlobType `json:"type"`
+ Fields []int32 `json:"fields"`
+ SnapshotID int64 `json:"snapshot-id"`
+ SequenceNumber int64 `json:"sequence-number"`
+ Offset int64 `json:"offset"`
+ Length int64 `json:"length"`
+ CompressionCodec *string `json:"compression-codec,omitempty"`
+ Properties map[string]string `json:"properties,omitempty"`
+}
+
+// Footer describes the blobs and file-level properties stored in a Puffin
file.
+type Footer struct {
+ Blobs []BlobMetadata `json:"blobs"`
+ Properties map[string]string `json:"properties,omitempty"`
+}
+
+type BlobType string
+
+const (
+ // BlobTypeDataSketchesTheta is a serialized compact Theta sketch
+ // produced by the Apache DataSketches library.
+ BlobTypeDataSketchesTheta BlobType = "apache-datasketches-theta-v1"
+
+ // BlobTypeDeletionVector is a serialized deletion vector per the
+ // Iceberg spec. Requires snapshot-id and sequence-number to be -1.
+ BlobTypeDeletionVector BlobType = "deletion-vector-v1"
+)
diff --git a/puffin/puffin_reader.go b/puffin/puffin_reader.go
new file mode 100644
index 00000000..d178a2ab
--- /dev/null
+++ b/puffin/puffin_reader.go
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package puffin
+
+import (
+ "bytes"
+ "encoding/binary"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "sort"
+)
+
+// ReaderAtSeeker combines io.ReaderAt and io.Seeker for reading Puffin files.
+// This interface is implemented by *os.File, *bytes.Reader, and similar types.
+type ReaderAtSeeker interface {
+ io.ReaderAt
+ io.Seeker
+}
+
+// Reader reads blobs and metadata from a Puffin file.
+//
+// Usage:
+//
+// r, err := puffin.NewReader(file)
+// if err != nil {
+// return err
+// }
+// for i := range r.Blobs() {
+// blob, err := r.ReadBlob(i)
+// // process blob.Data
+// }
+type Reader struct {
+ r ReaderAtSeeker
+ size int64
+ footer Footer
+ footerStart int64 // cached after ReadFooter
+ maxBlobSize int64
+}
+
+// BlobData pairs a blob's metadata with its content.
+type BlobData struct {
+ Metadata BlobMetadata
+ Data []byte
+}
+
+// ReaderOption configures a Reader.
+type ReaderOption func(*Reader)
+
+// WithMaxBlobSize sets the maximum blob size allowed when reading.
+// This prevents OOM attacks from malicious files with huge blob lengths.
+// Default is DefaultMaxBlobSize (256 MB).
+func WithMaxBlobSize(size int64) ReaderOption {
+ return func(r *Reader) {
+ r.maxBlobSize = size
+ }
+}
+
+// NewReader creates a new Puffin file reader.
+// The file size is auto-detected using Seek.
+// It validates magic bytes and reads the footer eagerly.
+// The caller is responsible for closing the underlying reader.
+func NewReader(r ReaderAtSeeker, opts ...ReaderOption) (*Reader, error) {
+ if r == nil {
+ return nil, errors.New("puffin: reader is nil")
+ }
+
+ // Auto-detect file size
+ size, err := r.Seek(0, io.SeekEnd)
+ if err != nil {
+ return nil, fmt.Errorf("puffin: detect file size: %w", err)
+ }
+
+ // Minimum size: header magic + footer magic + footer trailer
+ // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming
~0)] + [Flags] + [Magic]
+ minSize := int64(MagicSize + MagicSize + footerTrailerSize)
+ if size < minSize {
+ return nil, fmt.Errorf("puffin: file too small (%d bytes,
minimum %d)", size, minSize)
+ }
+
+ // Validate header magic
+ var headerMagic [MagicSize]byte
+ if _, err := r.ReadAt(headerMagic[:], 0); err != nil {
+ return nil, fmt.Errorf("puffin: read header magic: %w", err)
+ }
+ if !bytes.Equal(headerMagic[:], magic[:]) {
+ return nil, errors.New("puffin: invalid header magic")
+ }
+
+ pr := &Reader{
+ r: r,
+ size: size,
+ maxBlobSize: DefaultMaxBlobSize,
+ }
+
+ for _, opt := range opts {
+ opt(pr)
+ }
+
+ // Read footer
+ if err := pr.readFooter(); err != nil {
+ return nil, err
+ }
+
+ return pr, nil
+}
+
+// Blobs returns the blob metadata entries from the footer.
+func (r *Reader) Blobs() []BlobMetadata {
+ return r.footer.Blobs
+}
+
+// Properties returns the file-level properties from the footer.
+func (r *Reader) Properties() map[string]string {
+ return r.footer.Properties
+}
+
+// defaultFooterReadSize is the initial read size when reading the footer.
+// We read more than strictly needed to hopefully get the entire footer
+// in one read, reducing round-trips on cloud object storage.
+const defaultFooterReadSize = 8 * 1024 // 8 KB
+
+// readFooter reads and parses the footer from the Puffin file.
+func (r *Reader) readFooter() error {
+ // Read a larger chunk from the end to minimize round-trips on cloud
storage.
+ // This often captures the entire footer in one read.
+ readSize := min(int64(defaultFooterReadSize), r.size)
+ buf := make([]byte, readSize)
+ if _, err := r.r.ReadAt(buf, r.size-readSize); err != nil {
+ return fmt.Errorf("puffin: read footer region: %w", err)
+ }
+
+ // Parse trailer from end of buffer: PayloadSize(4) + Flags(4) +
Magic(4)
+ trailer := buf[len(buf)-footerTrailerSize:]
+
+ // Validate trailing magic
+ if !bytes.Equal(trailer[8:12], magic[:]) {
+ return errors.New("puffin: invalid trailing magic in footer")
+ }
+
+ // Extract payload size and flags
+ payloadSize := int64(binary.LittleEndian.Uint32(trailer[0:4]))
+ flags := binary.LittleEndian.Uint32(trailer[4:8])
+
+ // Check for compressed footer (unsupported)
+ if flags&FooterFlagCompressed != 0 {
+ return errors.New("puffin: compressed footer not supported")
+ }
+
+ // Check for unknown flags
+ if flags&^uint32(FooterFlagCompressed) != 0 {
+ return fmt.Errorf("puffin: unknown footer flags set: 0x%x",
flags)
+ }
+
+ // Validate payload size
+ if payloadSize < 0 {
+ return fmt.Errorf("puffin: invalid footer payload size %d",
payloadSize)
+ }
+
+ // Calculate footer start position
+ // Layout: [header magic (4)] [blobs...] [footer magic (4)] [JSON
(payloadSize)] [trailer (12)]
+ footerStart := r.size - footerTrailerSize - payloadSize - MagicSize
+ if footerStart < MagicSize {
+ return fmt.Errorf("puffin: footer payload size %d exceeds
available space", payloadSize)
+ }
+
+ // Total footer size: magic(4) + payload + trailer(12)
+ totalFooterSize := MagicSize + payloadSize + footerTrailerSize
+
+ // Validate footer start magic
+ if totalFooterSize <= readSize {
+ // We already have the footer magic in buf
+ footerOffset := len(buf) - int(totalFooterSize)
+ if !bytes.Equal(buf[footerOffset:footerOffset+MagicSize],
magic[:]) {
+ return errors.New("puffin: invalid footer start magic")
+ }
+ } else {
+ // Footer is larger than our initial read, need to read magic
separately
+ var footerMagic [MagicSize]byte
+ if _, err := r.r.ReadAt(footerMagic[:], footerStart); err !=
nil {
+ return fmt.Errorf("puffin: read footer start magic:
%w", err)
+ }
+ if !bytes.Equal(footerMagic[:], magic[:]) {
+ return errors.New("puffin: invalid footer start magic")
+ }
+ }
+
+ payloadReader := io.NewSectionReader(r.r, footerStart+MagicSize,
payloadSize)
+ var footer Footer
+ if err := json.NewDecoder(payloadReader).Decode(&footer); err != nil {
+ return fmt.Errorf("puffin: decode footer JSON: %w", err)
+ }
+
+ // Validate blob metadata
+ if err := r.validateBlobs(footer.Blobs, footerStart); err != nil {
+ return err
+ }
+
+ r.footer = footer
+ r.footerStart = footerStart
+
+ return nil
+}
+
+// ReadBlob reads the content of a specific blob by index.
+// The footer is read automatically if not already cached.
+func (r *Reader) ReadBlob(index int) (*BlobData, error) {
+ footer := r.footer
+
+ if index < 0 || index >= len(footer.Blobs) {
+ return nil, fmt.Errorf("puffin: blob index %d out of range [0,
%d)", index, len(footer.Blobs))
+ }
+
+ meta := footer.Blobs[index]
+ data, err := r.readBlobData(meta)
+ if err != nil {
+ return nil, err
+ }
+
+ return &BlobData{Metadata: meta, Data: data}, nil
+}
+
+// ReadBlobByMetadata reads a blob using its metadata directly.
+// This is useful when you have metadata from an external source.
+func (r *Reader) ReadBlobByMetadata(meta BlobMetadata) ([]byte, error) {
+ return r.readBlobData(meta)
+}
+
+// readBlobData is the internal implementation for reading blob data.
+func (r *Reader) readBlobData(meta BlobMetadata) ([]byte, error) {
+ // Validate blob type
+ if meta.Type == "" {
+ return nil, errors.New("puffin: cannot read blob: type is
required")
+ }
+
+ // Check for compressed blob (unsupported)
+ if meta.CompressionCodec != nil && *meta.CompressionCodec != "" {
+ return nil, fmt.Errorf("puffin: cannot read blob: compression
%q not supported", *meta.CompressionCodec)
+ }
+
+ // Validate offset/length
+ if err := r.validateRange(meta.Offset, meta.Length); err != nil {
+ return nil, fmt.Errorf("puffin: blob: %w", err)
+ }
+
+ // Read blob data
+ data := make([]byte, meta.Length)
+ if _, err := r.r.ReadAt(data, meta.Offset); err != nil {
+ return nil, fmt.Errorf("puffin: read blob data: %w", err)
+ }
+
+ return data, nil
+}
+
+// ReadAllBlobs reads all blobs from the file.
+func (r *Reader) ReadAllBlobs() ([]*BlobData, error) {
+ footer := r.footer
+
+ if len(footer.Blobs) == 0 {
+ return nil, nil
+ }
+
+ // Create index mapping to preserve original order
+ type indexedBlob struct {
+ index int
+ meta BlobMetadata
+ }
+ indexed := make([]indexedBlob, len(footer.Blobs))
+ for i, meta := range footer.Blobs {
+ indexed[i] = indexedBlob{index: i, meta: meta}
+ }
+
+ // Sort by offset for sequential I/O
+ sort.Slice(indexed, func(i, j int) bool {
+ return indexed[i].meta.Offset < indexed[j].meta.Offset
+ })
+
+ // Read blobs in offset order, store in original order
+ results := make([]*BlobData, len(footer.Blobs))
+ for _, ib := range indexed {
+ data, err := r.readBlobData(ib.meta)
+ if err != nil {
+ return nil, fmt.Errorf("puffin: read blob %d: %w",
ib.index, err)
+ }
+ results[ib.index] = &BlobData{Metadata: ib.meta, Data: data}
+ }
+
+ return results, nil
+}
+
+// ReadAt implements io.ReaderAt, reading from the blob data region.
+// It validates that the read range is within the blob data region
+// This is useful for deletion vector use case.
+// offset/length pointing directly into the Puffin file in manifest.
+func (r *Reader) ReadAt(p []byte, off int64) (n int, err error) {
+ if err := r.validateRange(off, int64(len(p))); err != nil {
+ return 0, fmt.Errorf("puffin: %w", err)
+ }
+
+ return r.r.ReadAt(p, off)
+}
+
+// validateRange validates offset and length for reading from the blob data
region.
+func (r *Reader) validateRange(offset, length int64) error {
+ if length < 0 {
+ return fmt.Errorf("invalid length %d", length)
+ }
+ if length > r.maxBlobSize {
+ return fmt.Errorf("size %d exceeds limit %d", length,
r.maxBlobSize)
+ }
+ if offset < MagicSize {
+ return fmt.Errorf("invalid offset %d (before header)", offset)
+ }
+
+ end := offset + length
+ if end < offset {
+ return fmt.Errorf("offset+length overflow: offset=%d
length=%d", offset, length)
+ }
+ if end > r.footerStart {
+ return fmt.Errorf("extends into footer: offset=%d length=%d
footerStart=%d",
+ offset, length, r.footerStart)
+ }
+
+ return nil
+}
+
+// validateBlobs validates all blob metadata entries.
+func (r *Reader) validateBlobs(blobs []BlobMetadata, footerStart int64) error {
+ for i, blob := range blobs {
+ // Type is required
+ if blob.Type == "" {
+ return fmt.Errorf("puffin: blob %d: type is required",
i)
+ }
+
+ // Length must be non-negative
+ if blob.Length < 0 {
+ return fmt.Errorf("puffin: blob %d: invalid length %d",
i, blob.Length)
+ }
+
+ // Offset must be after header magic
+ if blob.Offset < MagicSize {
+ return fmt.Errorf("puffin: blob %d: offset %d before
header", i, blob.Offset)
+ }
+
+ // Check for overflow
+ end := blob.Offset + blob.Length
+ if end < blob.Offset {
+ return fmt.Errorf("puffin: blob %d: offset+length
overflow: offset=%d length=%d", i, blob.Offset, blob.Length)
+ }
+
+ // Blob must not extend into footer
+ if end > footerStart {
+ return fmt.Errorf("puffin: blob %d: extends into
footer: offset=%d length=%d footerStart=%d",
+ i, blob.Offset, blob.Length, footerStart)
+ }
+ }
+
+ return nil
+}
diff --git a/puffin/puffin_test.go b/puffin/puffin_test.go
new file mode 100644
index 00000000..2752e15f
--- /dev/null
+++ b/puffin/puffin_test.go
@@ -0,0 +1,557 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package puffin_test
+
+import (
+ "bytes"
+ "math"
+ "os"
+ "path"
+ "testing"
+
+ "github.com/apache/iceberg-go/puffin"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// --- Test Helpers ---
+
+func newWriter() (*puffin.Writer, *bytes.Buffer) {
+ buf := &bytes.Buffer{}
+ w, _ := puffin.NewWriter(buf)
+
+ return w, buf
+}
+
+func newReader(t *testing.T, buf *bytes.Buffer) *puffin.Reader {
+ r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()))
+ require.NoError(t, err)
+
+ return r
+}
+
+func defaultBlobInput() puffin.BlobMetadataInput {
+ return puffin.BlobMetadataInput{
+ Type: puffin.BlobTypeDataSketchesTheta,
+ Fields: []int32{},
+ }
+}
+
+func validFile() []byte {
+ w, buf := newWriter()
+ w.Finish()
+
+ return buf.Bytes()
+}
+
+func validFileWithBlob() []byte {
+ w, buf := newWriter()
+ w.AddBlob(defaultBlobInput(), []byte("test data"))
+ w.Finish()
+
+ return buf.Bytes()
+}
+
+// --- Tests ---
+
+// TestRoundTrip verifies that data written by Writer can be read back by
Reader.
+// This is the core integration test ensuring the puffin format is correctly
implemented.
+func TestRoundTrip(t *testing.T) {
+ blob1Data := []byte("theta sketch data here")
+ blob2Data := []byte("another blob with different content")
+
+ w, buf := newWriter()
+ w.AddProperties(map[string]string{"test-property": "test-value"})
+
+ meta1, err := w.AddBlob(puffin.BlobMetadataInput{
+ Type: puffin.BlobTypeDataSketchesTheta,
+ SnapshotID: 123,
+ SequenceNumber: 1,
+ Fields: []int32{1, 2, 3},
+ Properties: map[string]string{"ndv": "1000"},
+ }, blob1Data)
+ require.NoError(t, err)
+
+ meta2, err := w.AddBlob(puffin.BlobMetadataInput{
+ Type: puffin.BlobTypeDeletionVector,
+ SnapshotID: -1,
+ SequenceNumber: -1,
+ Fields: []int32{},
+ Properties: map[string]string{"referenced-data-file":
"data/file.parquet"},
+ }, blob2Data)
+ require.NoError(t, err)
+ require.NoError(t, w.Finish())
+
+ r := newReader(t, buf)
+ blobs := r.Blobs()
+ props := r.Properties()
+
+ assert.Len(t, blobs, 2)
+ assert.Equal(t, "test-value", props["test-property"])
+ assert.Contains(t, props[puffin.CreatedBy], "iceberg-go")
+
+ // Verify blob 1
+ assert.Equal(t, puffin.BlobTypeDataSketchesTheta, blobs[0].Type)
+ assert.Equal(t, int64(123), blobs[0].SnapshotID)
+ assert.Equal(t, int64(1), blobs[0].SequenceNumber)
+ assert.Equal(t, []int32{1, 2, 3}, blobs[0].Fields)
+ assert.Equal(t, meta1.Offset, blobs[0].Offset)
+ assert.Equal(t, meta1.Length, blobs[0].Length)
+ assert.Equal(t, "1000", blobs[0].Properties["ndv"])
+
+ // Verify blob 2
+ assert.Equal(t, puffin.BlobTypeDeletionVector, blobs[1].Type)
+ assert.Equal(t, int64(-1), blobs[1].SnapshotID)
+ assert.Equal(t, int64(-1), blobs[1].SequenceNumber)
+ assert.Equal(t, meta2.Offset, blobs[1].Offset)
+ assert.Equal(t, meta2.Length, blobs[1].Length)
+
+ // Verify data
+ blobData1, _ := r.ReadBlob(0)
+ assert.Equal(t, blob1Data, blobData1.Data)
+
+ blobData2, _ := r.ReadBlob(1)
+ assert.Equal(t, blob2Data, blobData2.Data)
+
+ allBlobs, _ := r.ReadAllBlobs()
+ assert.Len(t, allBlobs, 2)
+ assert.Equal(t, blob1Data, allBlobs[0].Data)
+ assert.Equal(t, blob2Data, allBlobs[1].Data)
+}
+
+// TestEmptyFile verifies that a puffin file with no blobs is valid.
+// Empty files are valid per spec and used when no statistics exist yet.
+func TestEmptyFile(t *testing.T) {
+ w, buf := newWriter()
+ require.NoError(t, w.Finish())
+
+ r := newReader(t, buf)
+ assert.Len(t, r.Blobs(), 0)
+ assert.Contains(t, r.Properties()[puffin.CreatedBy], "iceberg-go")
+
+ blobs, err := r.ReadAllBlobs()
+ require.NoError(t, err)
+ assert.Nil(t, blobs)
+}
+
+// TestEmptyBlobData verifies that a zero-length blob is valid.
+// Some blob types may legitimately have empty content.
+func TestEmptyBlobData(t *testing.T) {
+ w, buf := newWriter()
+ meta, err := w.AddBlob(defaultBlobInput(), []byte{})
+ require.NoError(t, err)
+ assert.Equal(t, int64(0), meta.Length)
+ w.Finish()
+
+ r := newReader(t, buf)
+ blob, _ := r.ReadBlob(0)
+ assert.Empty(t, blob.Data)
+}
+
+// TestLargeFooter verifies reading works when footer exceeds initial 8KB read
buffer.
+// This exercises the code path where footer requires a second read from
storage.
+func TestLargeFooter(t *testing.T) {
+ w, buf := newWriter()
+ numBlobs := 200
+ for i := range numBlobs {
+ w.AddBlob(puffin.BlobMetadataInput{
+ Type: puffin.BlobTypeDataSketchesTheta,
+ SnapshotID: int64(i),
+ Fields: []int32{1, 2, 3, 4, 5},
+ Properties: map[string]string{"key":
"value-to-increase-footer-size"},
+ }, []byte("blob"))
+ }
+ w.Finish()
+
+ r := newReader(t, buf)
+ assert.Len(t, r.Blobs(), numBlobs)
+}
+
+// TestWriterValidation verifies that Writer rejects invalid input.
+// Proper validation prevents corrupt files and provides clear error messages.
+func TestWriterValidation(t *testing.T) {
+ // nil writer: Ensures graceful failure when no underlying writer is
provided.
+ t.Run("nil writer", func(t *testing.T) {
+ _, err := puffin.NewWriter(nil)
+ assert.ErrorContains(t, err, "nil")
+ })
+
+ // missing type: Blob type is required per spec to identify the blob
format.
+ t.Run("missing type", func(t *testing.T) {
+ w, _ := newWriter()
+ _, err := w.AddBlob(puffin.BlobMetadataInput{Type: "", Fields:
[]int32{}}, []byte("x"))
+ assert.ErrorContains(t, err, "type")
+ })
+
+ // nil fields: Fields slice must be non-nil per spec (empty slice is
valid).
+ t.Run("nil fields", func(t *testing.T) {
+ w, _ := newWriter()
+ _, err := w.AddBlob(puffin.BlobMetadataInput{Type:
puffin.BlobTypeDataSketchesTheta, Fields: nil}, []byte("x"))
+ assert.ErrorContains(t, err, "fields")
+ })
+
+ // add blob after finish: Enforces writer state machine - no writes
after finalization.
+ t.Run("add blob after finish", func(t *testing.T) {
+ w, _ := newWriter()
+ w.Finish()
+ _, err := w.AddBlob(defaultBlobInput(), []byte("x"))
+ assert.ErrorContains(t, err, "finalized")
+ })
+
+ // double finish: Prevents accidental double-write of footer corrupting
the file.
+ t.Run("double finish", func(t *testing.T) {
+ w, _ := newWriter()
+ w.Finish()
+ assert.ErrorContains(t, w.Finish(), "finalized")
+ })
+
+ // deletion vector invalid snapshot-id: Spec requires snapshot-id=-1
for deletion vectors.
+ t.Run("deletion vector invalid snapshot-id", func(t *testing.T) {
+ w, _ := newWriter()
+ _, err := w.AddBlob(puffin.BlobMetadataInput{
+ Type: puffin.BlobTypeDeletionVector, SnapshotID: 123,
SequenceNumber: -1, Fields: []int32{},
+ }, []byte("x"))
+ assert.ErrorContains(t, err, "snapshot-id")
+ })
+
+ // deletion vector invalid sequence-number: Spec requires
sequence-number=-1 for deletion vectors.
+ t.Run("deletion vector invalid sequence-number", func(t *testing.T) {
+ w, _ := newWriter()
+ _, err := w.AddBlob(puffin.BlobMetadataInput{
+ Type: puffin.BlobTypeDeletionVector, SnapshotID: -1,
SequenceNumber: 5, Fields: []int32{},
+ }, []byte("x"))
+ assert.ErrorContains(t, err, "sequence-number")
+ })
+}
+
+// TestSetCreatedBy verifies the SetCreatedBy method.
+// Allows applications to identify themselves in puffin files for debugging.
+func TestSetCreatedBy(t *testing.T) {
+ // custom value: Verifies custom application identifiers are preserved
in footer.
+ t.Run("custom value", func(t *testing.T) {
+ w, buf := newWriter()
+ require.NoError(t, w.SetCreatedBy("MyApp 1.0"))
+ w.Finish()
+ r := newReader(t, buf)
+ assert.Equal(t, "MyApp 1.0", r.Properties()[puffin.CreatedBy])
+ })
+
+ // empty rejected: Empty identifier provides no value and likely
indicates a bug.
+ t.Run("empty rejected", func(t *testing.T) {
+ w, _ := newWriter()
+ assert.ErrorContains(t, w.SetCreatedBy(""), "empty")
+ })
+
+ // after finish rejected: Enforces writer state machine consistency.
+ t.Run("after finish rejected", func(t *testing.T) {
+ w, _ := newWriter()
+ w.Finish()
+ assert.ErrorContains(t, w.SetCreatedBy("x"), "finalized")
+ })
+}
+
+// TestClearProperties verifies the ClearProperties method.
+// Allows resetting properties before finishing if initial values were wrong.
+func TestClearProperties(t *testing.T) {
+ w, buf := newWriter()
+ w.AddProperties(map[string]string{"key": "value"})
+ w.ClearProperties()
+ w.Finish()
+
+ r := newReader(t, buf)
+ _, exists := r.Properties()["key"]
+ assert.False(t, exists)
+}
+
+// TestAddPropertiesAfterFinish verifies AddProperties rejects calls after
finish.
+// Enforces writer state machine - properties cannot be added after footer is
written.
+func TestAddPropertiesAfterFinish(t *testing.T) {
+ w, _ := newWriter()
+ w.Finish()
+ assert.ErrorContains(t, w.AddProperties(map[string]string{"k": "v"}),
"finalized")
+}
+
+// TestReaderInvalidFile verifies that Reader rejects invalid/corrupt files.
+// Early detection of corruption prevents silent data loss or security issues.
+func TestReaderInvalidFile(t *testing.T) {
+ tests := []struct {
+ name string
+ data func() []byte
+ wantErr string
+ }{
+ // file too small: Minimum valid puffin file has header magic +
footer, rejects truncated files.
+ {"file too small", func() []byte { return []byte("tiny") },
"too small"},
+ // invalid header magic: First 4 bytes must be 'PFA1' to
identify puffin format.
+ {"invalid header magic", func() []byte {
+ d := validFile()
+ d[0] = 'X'
+
+ return d
+ }, "magic"},
+ // invalid trailing magic: Last 4 bytes must be 'PFA1' to
detect truncation.
+ {"invalid trailing magic", func() []byte {
+ d := validFile()
+ d[len(d)-1] = 'X'
+
+ return d
+ }, "magic"},
+ // invalid footer start magic: Footer section must start with
'PFA1' for integrity.
+ {"invalid footer start magic", func() []byte {
+ d := validFile()
+ d[4] = 'X'
+
+ return d
+ }, "magic"},
+ // unknown flags: Reject files with unsupported features to
avoid misinterpretation.
+ {"unknown flags", func() []byte {
+ d := validFile()
+ d[len(d)-8] = 0x80
+
+ return d
+ }, "unknown"},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ data := tt.data()
+ _, err := puffin.NewReader(bytes.NewReader(data))
+ assert.ErrorContains(t, err, tt.wantErr)
+ })
+ }
+
+ // nil reader: Ensures graceful failure when no underlying reader is
provided.
+ t.Run("nil reader", func(t *testing.T) {
+ _, err := puffin.NewReader(nil)
+ assert.ErrorContains(t, err, "nil")
+ })
+}
+
+// TestReaderBlobAccess verifies blob access methods work correctly.
+// Tests the primary API for retrieving blob data from puffin files.
+func TestReaderBlobAccess(t *testing.T) {
+ w, buf := newWriter()
+ blobs := [][]byte{[]byte("first"), []byte("second"), []byte("third")}
+ for _, b := range blobs {
+ w.AddBlob(defaultBlobInput(), b)
+ }
+ w.Finish()
+ r := newReader(t, buf)
+
+ // read by index: Primary access method for retrieving blobs
sequentially.
+ t.Run("read by index", func(t *testing.T) {
+ for i, expected := range blobs {
+ blob, _ := r.ReadBlob(i)
+ assert.Equal(t, expected, blob.Data)
+ }
+ })
+
+ // index out of range: Prevents panic and provides clear error for
invalid indices.
+ t.Run("index out of range", func(t *testing.T) {
+ _, err := r.ReadBlob(-1)
+ assert.Error(t, err)
+ _, err = r.ReadBlob(100)
+ assert.Error(t, err)
+ })
+
+ // read by metadata: Allows direct access when caller has metadata from
external source.
+ t.Run("read by metadata", func(t *testing.T) {
+ data, _ := r.ReadBlobByMetadata(r.Blobs()[1])
+ assert.Equal(t, blobs[1], data)
+ })
+
+ // read all preserves order: Ensures blobs are returned in same order
as written.
+ t.Run("read all preserves order", func(t *testing.T) {
+ all, _ := r.ReadAllBlobs()
+ for i, expected := range blobs {
+ assert.Equal(t, expected, all[i].Data)
+ }
+ })
+}
+
+// TestReadBlobByMetadataValidation verifies validation of blob metadata.
+// Prevents reading garbage data when metadata is corrupted or crafted
maliciously.
+func TestReadBlobByMetadataValidation(t *testing.T) {
+ data := validFileWithBlob()
+ r, _ := puffin.NewReader(bytes.NewReader(data))
+
+ tests := []struct {
+ name string
+ meta puffin.BlobMetadata
+ wantErr string
+ }{
+ // empty type: Type is required to interpret blob content
correctly.
+ {"empty type", puffin.BlobMetadata{Type: "", Offset: 4, Length:
1}, "type"},
+ // offset before header: Prevents reading magic bytes as blob
data.
+ {"offset before header", puffin.BlobMetadata{Type: "t", Offset:
0, Length: 1}, "header"},
+ // negative length: Invalid length could cause allocation
issues.
+ {"negative length", puffin.BlobMetadata{Type: "t", Offset: 4,
Length: -1}, "length"},
+ // extends into footer: Prevents reading footer JSON as blob
data.
+ {"extends into footer", puffin.BlobMetadata{Type: "t", Offset:
4, Length: 9999}, "footer"},
+ // overflow: Prevents integer overflow attacks in offset+length
calculation.
+ {"overflow", puffin.BlobMetadata{Type: "t", Offset:
math.MaxInt64, Length: 1}, "overflow"},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ _, err := r.ReadBlobByMetadata(tt.meta)
+ assert.ErrorContains(t, err, tt.wantErr)
+ })
+ }
+}
+
+// TestReadAt verifies the ReadAt method (io.ReaderAt implementation).
+// Enables partial reads for streaming or when only specific byte ranges are
needed.
+func TestReadAt(t *testing.T) {
+ w, buf := newWriter()
+ meta, _ := w.AddBlob(defaultBlobInput(), []byte("hello world"))
+ w.Finish()
+ r := newReader(t, buf)
+
+ // valid range: Verifies partial blob reads work correctly.
+ t.Run("valid range", func(t *testing.T) {
+ data := make([]byte, 5)
+ n, err := r.ReadAt(data, meta.Offset)
+ require.NoError(t, err)
+ assert.Equal(t, 5, n)
+ assert.Equal(t, []byte("hello"), data)
+ })
+
+ // extends into footer: Prevents reading footer metadata as blob
content.
+ t.Run("extends into footer", func(t *testing.T) {
+ data := make([]byte, buf.Len())
+ _, err := r.ReadAt(data, 4)
+ assert.ErrorContains(t, err, "footer")
+ })
+
+ // offset before header: Prevents reading file magic as blob content.
+ t.Run("offset before header", func(t *testing.T) {
+ data := make([]byte, 4)
+ _, err := r.ReadAt(data, 0)
+ assert.ErrorContains(t, err, "header")
+ })
+}
+
+// Integration tests against canonical Java-generated fixtures.
+// Source:
https://github.com/apache/iceberg/tree/main/core/src/test/resources/org/apache/iceberg/puffin/v1
+
+func readFixture(t *testing.T, name string) []byte {
+ t.Helper()
+ data, err := os.ReadFile(path.Join("testdata", name))
+ require.NoError(t, err)
+
+ return data
+}
+
+// TestReadJavaEmptyFile reads the canonical empty puffin file generated by
+// the Java implementation and verifies our reader parses it correctly.
+func TestReadJavaEmptyFile(t *testing.T) {
+ data := readFixture(t, "empty-puffin-uncompressed.bin")
+ r, err := puffin.NewReader(bytes.NewReader(data))
+ require.NoError(t, err)
+
+ assert.Empty(t, r.Blobs())
+ assert.Empty(t, r.Properties())
+}
+
+// TestReadJavaUncompressedMetricData reads the uncompressed metric data file
+// from the Java implementation and verifies all metadata and blob
+// contents match the expected values exactly.
+func TestReadJavaUncompressedMetricData(t *testing.T) {
+ data := readFixture(t, "sample-metric-data-uncompressed.bin")
+ r, err := puffin.NewReader(bytes.NewReader(data))
+ require.NoError(t, err)
+
+ // File-level properties
+ assert.Equal(t, "Test 1234", r.Properties()[puffin.CreatedBy])
+
+ // Should have exactly 2 blobs
+ blobs := r.Blobs()
+ require.Len(t, blobs, 2)
+
+ // Blob 0: type="some-blob"
+ assert.Equal(t, puffin.BlobType("some-blob"), blobs[0].Type)
+ assert.Equal(t, []int32{1}, blobs[0].Fields)
+ assert.Equal(t, int64(2), blobs[0].SnapshotID)
+ assert.Equal(t, int64(1), blobs[0].SequenceNumber)
+ assert.Equal(t, int64(4), blobs[0].Offset)
+ assert.Equal(t, int64(9), blobs[0].Length)
+ assert.Nil(t, blobs[0].CompressionCodec)
+
+ blob0, err := r.ReadBlob(0)
+ require.NoError(t, err)
+ assert.Equal(t, []byte("abcdefghi"), blob0.Data)
+
+ // Blob 1: type="some-other-blob", contains NUL byte and emoji
+ assert.Equal(t, puffin.BlobType("some-other-blob"), blobs[1].Type)
+ assert.Equal(t, []int32{2}, blobs[1].Fields)
+ assert.Equal(t, int64(2), blobs[1].SnapshotID)
+ assert.Equal(t, int64(1), blobs[1].SequenceNumber)
+ assert.Equal(t, int64(13), blobs[1].Offset)
+ assert.Equal(t, int64(83), blobs[1].Length)
+ assert.Nil(t, blobs[1].CompressionCodec)
+
+ blob1, err := r.ReadBlob(1)
+ require.NoError(t, err)
+ expectedBlob1 := []byte("some blob \x00 binary data \xf0\x9f\xa4\xaf
that is not very very very very very very long, is it?")
+ assert.Equal(t, expectedBlob1, blob1.Data)
+ assert.Len(t, blob1.Data, 83)
+}
+
+// TestReadJavaCompressedMetricData verifies our reader correctly rejects
+// the zstd-compressed fixture since compression is not yet supported.
+func TestReadJavaCompressedMetricData(t *testing.T) {
+ data := readFixture(t, "sample-metric-data-compressed-zstd.bin")
+ r, err := puffin.NewReader(bytes.NewReader(data))
+ require.NoError(t, err)
+
+ // Metadata should be readable (compression is per-blob, not footer)
+ blobs := r.Blobs()
+ require.Len(t, blobs, 2)
+
+ // But reading blob data should fail due to compression
+ _, err = r.ReadBlob(0)
+ assert.ErrorContains(t, err, "compression")
+}
+
+// TestWriterBitIdenticalWithJava verifies that our writer produces
+// byte-identical output to the Java implementation's canonical fixture.
+func TestWriterBitIdenticalWithJava(t *testing.T) {
+ expected := readFixture(t, "sample-metric-data-uncompressed.bin")
+
+ w, buf := newWriter()
+ require.NoError(t, w.SetCreatedBy("Test 1234"))
+
+ _, err := w.AddBlob(puffin.BlobMetadataInput{
+ Type: "some-blob",
+ Fields: []int32{1},
+ SnapshotID: 2,
+ SequenceNumber: 1,
+ }, []byte("abcdefghi"))
+ require.NoError(t, err)
+
+ _, err = w.AddBlob(puffin.BlobMetadataInput{
+ Type: "some-other-blob",
+ Fields: []int32{2},
+ SnapshotID: 2,
+ SequenceNumber: 1,
+ }, []byte("some blob \x00 binary data \xf0\x9f\xa4\xaf that is not very
very very very very very long, is it?"))
+ require.NoError(t, err)
+
+ require.NoError(t, w.Finish())
+ assert.Equal(t, expected, buf.Bytes())
+}
diff --git a/puffin/puffin_writer.go b/puffin/puffin_writer.go
new file mode 100644
index 00000000..fd0a39dd
--- /dev/null
+++ b/puffin/puffin_writer.go
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package puffin
+
+import (
+ "encoding/binary"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "math"
+
+ "github.com/apache/iceberg-go"
+)
+
+// Writer writes blobs and metadata to a Puffin file.
+//
+// Usage:
+//
+// w, err := puffin.NewWriter(file)
+// if err != nil {
+// return err
+// }
+// _, err = w.AddBlob(puffin.BlobMetadataInput{
+// Type: puffin.BlobTypeDataSketchesTheta,
+// SnapshotID: 123,
+// Fields: []int32{1},
+// }, sketchBytes)
+// if err != nil {
+// return err
+// }
+// return w.Finish()
+type Writer struct {
+ w io.Writer
+ offset int64
+ blobs []BlobMetadata
+ props map[string]string
+ done bool
+ createdBy string
+}
+
+// BlobMetadataInput contains fields the caller provides when adding a blob.
+// Offset, Length, and CompressionCodec are set by the writer.
+type BlobMetadataInput struct {
+ Type BlobType
+ SnapshotID int64
+ SequenceNumber int64
+ Fields []int32
+ Properties map[string]string
+}
+
+// NewWriter creates a new Writer and writes the file header magic.
+// The caller is responsible for closing the underlying writer after Finish
returns.
+func NewWriter(w io.Writer) (*Writer, error) {
+ if w == nil {
+ return nil, errors.New("puffin: writer is nil")
+ }
+
+ // Write header magic bytes
+ if err := writeAll(w, magic[:]); err != nil {
+ return nil, fmt.Errorf("puffin: write header magic: %w", err)
+ }
+
+ return &Writer{
+ w: w,
+ offset: MagicSize,
+ props: make(map[string]string),
+ createdBy: "iceberg-go " + iceberg.Version(),
+ }, nil
+}
+
+// SetProperties merges the provided properties into the file-level properties
+// written to the footer. Can be called multiple times before Finish.
+func (w *Writer) AddProperties(props map[string]string) error {
+ if w.done {
+ return errors.New("puffin: cannot set properties: writer
already finalized")
+ }
+ for k, v := range props {
+ w.props[k] = v
+ }
+
+ return nil
+}
+
+// clear properties
+func (w *Writer) ClearProperties() {
+ w.props = make(map[string]string)
+}
+
+// SetCreatedBy overrides the default "created-by" property written to the
footer.
+// The default value is "iceberg-go". Example: "MyApp version 1.2.3".
+func (w *Writer) SetCreatedBy(createdBy string) error {
+ if w.done {
+ return errors.New("puffin: cannot set created-by: writer
already finalized")
+ }
+ if createdBy == "" {
+ return errors.New("puffin: cannot set created-by: value cannot
be empty")
+ }
+ w.createdBy = createdBy
+
+ return nil
+}
+
+// AddBlob writes blob data and records its metadata for the footer.
+// Returns the complete BlobMetadata including the computed Offset and Length.
+// The input.Type is required; use constants like ApacheDataSketchesThetaV1.
+func (w *Writer) AddBlob(input BlobMetadataInput, data []byte) (BlobMetadata,
error) {
+ if w.done {
+ return BlobMetadata{}, errors.New("puffin: cannot add blob:
writer already finalized")
+ }
+ if input.Type == "" {
+ return BlobMetadata{}, errors.New("puffin: cannot add blob:
type is required")
+ }
+ if input.Fields == nil {
+ return BlobMetadata{}, errors.New("puffin: cannot add blob:
fields is required")
+ }
+
+ // Deletion vectors have special requirements per spec
+ if input.Type == BlobTypeDeletionVector {
+ if input.SnapshotID != -1 {
+ return BlobMetadata{}, errors.New("puffin:
deletion-vector-v1 requires snapshot-id to be -1")
+ }
+ if input.SequenceNumber != -1 {
+ return BlobMetadata{}, errors.New("puffin:
deletion-vector-v1 requires sequence-number to be -1")
+ }
+ }
+
+ meta := BlobMetadata{
+ Type: input.Type,
+ SnapshotID: input.SnapshotID,
+ SequenceNumber: input.SequenceNumber,
+ Fields: input.Fields,
+ Offset: w.offset,
+ Length: int64(len(data)),
+ Properties: input.Properties,
+ }
+
+ if err := writeAll(w.w, data); err != nil {
+ return BlobMetadata{}, fmt.Errorf("puffin: write blob: %w", err)
+ }
+
+ w.offset += meta.Length
+ w.blobs = append(w.blobs, meta)
+
+ return meta, nil
+}
+
+// Finish writes the footer and completes the Puffin file structure.
+// Must be called exactly once after all blobs are written.
+// After Finish returns, no further operations are allowed on the writer.
+func (w *Writer) Finish() error {
+ if w.done {
+ return errors.New("puffin: cannot finish: writer already
finalized")
+ }
+
+ // Build footer
+ blobs := w.blobs
+ if blobs == nil {
+ blobs = []BlobMetadata{}
+ }
+
+ // Only include properties in the footer if there are any to write.
+ // Add created-by first, then decide whether to include properties.
+ if w.createdBy != "" {
+ w.props[CreatedBy] = w.createdBy
+ }
+ var props map[string]string
+ if len(w.props) > 0 {
+ props = w.props
+ }
+
+ footer := Footer{
+ Blobs: blobs,
+ Properties: props,
+ }
+
+ payload, err := json.Marshal(footer)
+ if err != nil {
+ return fmt.Errorf("puffin: marshal footer: %w", err)
+ }
+
+ // Check footer size fits in int32
+ if len(payload) > math.MaxInt32 {
+ return fmt.Errorf("puffin: footer too large: %d bytes exceeds
2GB limit", len(payload))
+ }
+
+ // Write footer start magic
+ if err := writeAll(w.w, magic[:]); err != nil {
+ return fmt.Errorf("puffin: write footer magic: %w", err)
+ }
+
+ // Write footer payload
+ if err := writeAll(w.w, payload); err != nil {
+ return fmt.Errorf("puffin: write footer payload: %w", err)
+ }
+
+ // Write trailer: PayloadSize(4) + Flags(4) + Magic(4)
+ var trailer [footerTrailerSize]byte
+ binary.LittleEndian.PutUint32(trailer[0:4], uint32(len(payload)))
+ binary.LittleEndian.PutUint32(trailer[4:8], 0) // flags = 0
(uncompressed)
+ copy(trailer[8:12], magic[:])
+
+ if err := writeAll(w.w, trailer[:]); err != nil {
+ return fmt.Errorf("puffin: write footer trailer: %w", err)
+ }
+
+ w.done = true
+
+ return nil
+}
+
+// writeAll writes all bytes to w or returns an error.
+// Handles the io.Writer contract where Write can return n < len(data) without
error.
+func writeAll(w io.Writer, data []byte) error {
+ n, err := w.Write(data)
+ if err != nil {
+ return err
+ }
+ if n != len(data) {
+ return fmt.Errorf("short write: wrote %d of %d bytes", n,
len(data))
+ }
+
+ return nil
+}
diff --git a/puffin/testdata/README.md b/puffin/testdata/README.md
new file mode 100644
index 00000000..084df93c
--- /dev/null
+++ b/puffin/testdata/README.md
@@ -0,0 +1,21 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+These test fixture files are canonical Puffin files from the Apache Iceberg
Java implementation:
+https://github.com/apache/iceberg/tree/main/core/src/test/resources/org/apache/iceberg/puffin/v1
diff --git a/puffin/testdata/empty-puffin-uncompressed.bin
b/puffin/testdata/empty-puffin-uncompressed.bin
new file mode 100644
index 00000000..142b45bd
Binary files /dev/null and b/puffin/testdata/empty-puffin-uncompressed.bin
differ
diff --git a/puffin/testdata/sample-metric-data-compressed-zstd.bin
b/puffin/testdata/sample-metric-data-compressed-zstd.bin
new file mode 100644
index 00000000..ac8b69c7
Binary files /dev/null and
b/puffin/testdata/sample-metric-data-compressed-zstd.bin differ
diff --git a/puffin/testdata/sample-metric-data-uncompressed.bin
b/puffin/testdata/sample-metric-data-uncompressed.bin
new file mode 100644
index 00000000..ab8da138
Binary files /dev/null and
b/puffin/testdata/sample-metric-data-uncompressed.bin differ