This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 19bec10a feat: puffin Reader and Writer (#676)
19bec10a is described below

commit 19bec10ab3a697e604000d0c52cf58c5d6f8c6b7
Author: Shreyas Mishra <[email protected]>
AuthorDate: Tue Feb 24 21:55:18 2026 +0530

    feat: puffin Reader and Writer (#676)
    
    for #589
    
      - Added PuffinWriter
      - Added PuffinReader
      -  uncompressed only
    
    ---------
    
    Signed-off-by: Shreyas220 <[email protected]>
---
 dev/release/rat_exclude_files.txt                  |   6 +-
 puffin/puffin.go                                   |  81 +++
 puffin/puffin_reader.go                            | 375 ++++++++++++++
 puffin/puffin_test.go                              | 557 +++++++++++++++++++++
 puffin/puffin_writer.go                            | 238 +++++++++
 puffin/testdata/README.md                          |  21 +
 puffin/testdata/empty-puffin-uncompressed.bin      | Bin 0 -> 32 bytes
 .../sample-metric-data-compressed-zstd.bin         | Bin 0 -> 417 bytes
 .../testdata/sample-metric-data-uncompressed.bin   | Bin 0 -> 355 bytes
 9 files changed, 1277 insertions(+), 1 deletion(-)

diff --git a/dev/release/rat_exclude_files.txt 
b/dev/release/rat_exclude_files.txt
index 08ab050c..c9ff5498 100644
--- a/dev/release/rat_exclude_files.txt
+++ b/dev/release/rat_exclude_files.txt
@@ -21,4 +21,8 @@ NOTICE
 go.sum
 build
 rat-results.txt
-operation_string.go
\ No newline at end of file
+operation_string.go
+empty-puffin-uncompressed.bin
+sample-metric-data-uncompressed.bin
+sample-metric-data-compressed-zstd.bin
+puffin/testdata/*
\ No newline at end of file
diff --git a/puffin/puffin.go b/puffin/puffin.go
new file mode 100644
index 00000000..85e8fbe1
--- /dev/null
+++ b/puffin/puffin.go
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Package puffin provides reading and writing of Puffin files.
+//
+// Puffin is a file format designed to store statistics and indexes
+// for Iceberg tables. A Puffin file contains blobs (opaque byte sequences)
+// with associated metadata, such as Apache DataSketches or deletion vectors.
+//
+// File structure:
+//
+//     [Magic] [Blob]* [Magic] [Footer Payload] [Footer Payload Size] [Flags] 
[Magic]
+//
+// See the specification at https://iceberg.apache.org/puffin-spec/
+package puffin
+
+var magic = [4]byte{'P', 'F', 'A', '1'}
+
+const (
+       //[Magic] [FooterPayload] [FooterPayloadSize] [Flags] [Magic]
+       // MagicSize is the number of bytes in the magic marker.
+       MagicSize = 4
+
+       // footerTrailerSize accounts for footer length (4)+ flags (4) + 
trailing magic (4).
+       footerTrailerSize = 12
+
+       // FooterFlagCompressed indicates a compressed footer; unsupported in 
this implementation.
+       FooterFlagCompressed = 1 // bit 0
+
+       // Prevents OOM
+       // DefaultMaxBlobSize is the maximum blob size allowed when reading 
(256 MB).
+       // Override with WithMaxBlobSize when creating a reader.
+       DefaultMaxBlobSize = 256 << 20
+
+       // CreatedBy is a human-readable identification of the application 
writing the file, along with its version.
+       // Example: "Trino version 381".
+       CreatedBy = "created-by"
+)
+
+type BlobMetadata struct {
+       Type             BlobType          `json:"type"`
+       Fields           []int32           `json:"fields"`
+       SnapshotID       int64             `json:"snapshot-id"`
+       SequenceNumber   int64             `json:"sequence-number"`
+       Offset           int64             `json:"offset"`
+       Length           int64             `json:"length"`
+       CompressionCodec *string           `json:"compression-codec,omitempty"`
+       Properties       map[string]string `json:"properties,omitempty"`
+}
+
+// Footer describes the blobs and file-level properties stored in a Puffin 
file.
+type Footer struct {
+       Blobs      []BlobMetadata    `json:"blobs"`
+       Properties map[string]string `json:"properties,omitempty"`
+}
+
+type BlobType string
+
+const (
+       // BlobTypeDataSketchesTheta is a serialized compact Theta sketch
+       // produced by the Apache DataSketches library.
+       BlobTypeDataSketchesTheta BlobType = "apache-datasketches-theta-v1"
+
+       // BlobTypeDeletionVector is a serialized deletion vector per the
+       // Iceberg spec. Requires snapshot-id and sequence-number to be -1.
+       BlobTypeDeletionVector BlobType = "deletion-vector-v1"
+)
diff --git a/puffin/puffin_reader.go b/puffin/puffin_reader.go
new file mode 100644
index 00000000..d178a2ab
--- /dev/null
+++ b/puffin/puffin_reader.go
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package puffin
+
+import (
+       "bytes"
+       "encoding/binary"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io"
+       "sort"
+)
+
+// ReaderAtSeeker combines io.ReaderAt and io.Seeker for reading Puffin files.
+// This interface is implemented by *os.File, *bytes.Reader, and similar types.
+type ReaderAtSeeker interface {
+       io.ReaderAt
+       io.Seeker
+}
+
+// Reader reads blobs and metadata from a Puffin file.
+//
+// Usage:
+//
+//     r, err := puffin.NewReader(file)
+//     if err != nil {
+//         return err
+//     }
+//     for i := range r.Blobs() {
+//         blob, err := r.ReadBlob(i)
+//         // process blob.Data
+//     }
+type Reader struct {
+       r           ReaderAtSeeker
+       size        int64
+       footer      Footer
+       footerStart int64 // cached after ReadFooter
+       maxBlobSize int64
+}
+
+// BlobData pairs a blob's metadata with its content.
+type BlobData struct {
+       Metadata BlobMetadata
+       Data     []byte
+}
+
+// ReaderOption configures a Reader.
+type ReaderOption func(*Reader)
+
+// WithMaxBlobSize sets the maximum blob size allowed when reading.
+// This prevents OOM attacks from malicious files with huge blob lengths.
+// Default is DefaultMaxBlobSize (256 MB).
+func WithMaxBlobSize(size int64) ReaderOption {
+       return func(r *Reader) {
+               r.maxBlobSize = size
+       }
+}
+
+// NewReader creates a new Puffin file reader.
+// The file size is auto-detected using Seek.
+// It validates magic bytes and reads the footer eagerly.
+// The caller is responsible for closing the underlying reader.
+func NewReader(r ReaderAtSeeker, opts ...ReaderOption) (*Reader, error) {
+       if r == nil {
+               return nil, errors.New("puffin: reader is nil")
+       }
+
+       // Auto-detect file size
+       size, err := r.Seek(0, io.SeekEnd)
+       if err != nil {
+               return nil, fmt.Errorf("puffin: detect file size: %w", err)
+       }
+
+       // Minimum size: header magic + footer magic + footer trailer
+       // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming 
~0)] + [Flags] + [Magic]
+       minSize := int64(MagicSize + MagicSize + footerTrailerSize)
+       if size < minSize {
+               return nil, fmt.Errorf("puffin: file too small (%d bytes, 
minimum %d)", size, minSize)
+       }
+
+       // Validate header magic
+       var headerMagic [MagicSize]byte
+       if _, err := r.ReadAt(headerMagic[:], 0); err != nil {
+               return nil, fmt.Errorf("puffin: read header magic: %w", err)
+       }
+       if !bytes.Equal(headerMagic[:], magic[:]) {
+               return nil, errors.New("puffin: invalid header magic")
+       }
+
+       pr := &Reader{
+               r:           r,
+               size:        size,
+               maxBlobSize: DefaultMaxBlobSize,
+       }
+
+       for _, opt := range opts {
+               opt(pr)
+       }
+
+       // Read footer
+       if err := pr.readFooter(); err != nil {
+               return nil, err
+       }
+
+       return pr, nil
+}
+
+// Blobs returns the blob metadata entries from the footer.
+func (r *Reader) Blobs() []BlobMetadata {
+       return r.footer.Blobs
+}
+
+// Properties returns the file-level properties from the footer.
+func (r *Reader) Properties() map[string]string {
+       return r.footer.Properties
+}
+
+// defaultFooterReadSize is the initial read size when reading the footer.
+// We read more than strictly needed to hopefully get the entire footer
+// in one read, reducing round-trips on cloud object storage.
+const defaultFooterReadSize = 8 * 1024 // 8 KB
+
+// readFooter reads and parses the footer from the Puffin file.
+func (r *Reader) readFooter() error {
+       // Read a larger chunk from the end to minimize round-trips on cloud 
storage.
+       // This often captures the entire footer in one read.
+       readSize := min(int64(defaultFooterReadSize), r.size)
+       buf := make([]byte, readSize)
+       if _, err := r.r.ReadAt(buf, r.size-readSize); err != nil {
+               return fmt.Errorf("puffin: read footer region: %w", err)
+       }
+
+       // Parse trailer from end of buffer: PayloadSize(4) + Flags(4) + 
Magic(4)
+       trailer := buf[len(buf)-footerTrailerSize:]
+
+       // Validate trailing magic
+       if !bytes.Equal(trailer[8:12], magic[:]) {
+               return errors.New("puffin: invalid trailing magic in footer")
+       }
+
+       // Extract payload size and flags
+       payloadSize := int64(binary.LittleEndian.Uint32(trailer[0:4]))
+       flags := binary.LittleEndian.Uint32(trailer[4:8])
+
+       // Check for compressed footer (unsupported)
+       if flags&FooterFlagCompressed != 0 {
+               return errors.New("puffin: compressed footer not supported")
+       }
+
+       // Check for unknown flags
+       if flags&^uint32(FooterFlagCompressed) != 0 {
+               return fmt.Errorf("puffin: unknown footer flags set: 0x%x", 
flags)
+       }
+
+       // Validate payload size
+       if payloadSize < 0 {
+               return fmt.Errorf("puffin: invalid footer payload size %d", 
payloadSize)
+       }
+
+       // Calculate footer start position
+       // Layout: [header magic (4)] [blobs...] [footer magic (4)] [JSON 
(payloadSize)] [trailer (12)]
+       footerStart := r.size - footerTrailerSize - payloadSize - MagicSize
+       if footerStart < MagicSize {
+               return fmt.Errorf("puffin: footer payload size %d exceeds 
available space", payloadSize)
+       }
+
+       // Total footer size: magic(4) + payload + trailer(12)
+       totalFooterSize := MagicSize + payloadSize + footerTrailerSize
+
+       // Validate footer start magic
+       if totalFooterSize <= readSize {
+               // We already have the footer magic in buf
+               footerOffset := len(buf) - int(totalFooterSize)
+               if !bytes.Equal(buf[footerOffset:footerOffset+MagicSize], 
magic[:]) {
+                       return errors.New("puffin: invalid footer start magic")
+               }
+       } else {
+               // Footer is larger than our initial read, need to read magic 
separately
+               var footerMagic [MagicSize]byte
+               if _, err := r.r.ReadAt(footerMagic[:], footerStart); err != 
nil {
+                       return fmt.Errorf("puffin: read footer start magic: 
%w", err)
+               }
+               if !bytes.Equal(footerMagic[:], magic[:]) {
+                       return errors.New("puffin: invalid footer start magic")
+               }
+       }
+
+       payloadReader := io.NewSectionReader(r.r, footerStart+MagicSize, 
payloadSize)
+       var footer Footer
+       if err := json.NewDecoder(payloadReader).Decode(&footer); err != nil {
+               return fmt.Errorf("puffin: decode footer JSON: %w", err)
+       }
+
+       // Validate blob metadata
+       if err := r.validateBlobs(footer.Blobs, footerStart); err != nil {
+               return err
+       }
+
+       r.footer = footer
+       r.footerStart = footerStart
+
+       return nil
+}
+
+// ReadBlob reads the content of a specific blob by index.
+// The footer is read automatically if not already cached.
+func (r *Reader) ReadBlob(index int) (*BlobData, error) {
+       footer := r.footer
+
+       if index < 0 || index >= len(footer.Blobs) {
+               return nil, fmt.Errorf("puffin: blob index %d out of range [0, 
%d)", index, len(footer.Blobs))
+       }
+
+       meta := footer.Blobs[index]
+       data, err := r.readBlobData(meta)
+       if err != nil {
+               return nil, err
+       }
+
+       return &BlobData{Metadata: meta, Data: data}, nil
+}
+
+// ReadBlobByMetadata reads a blob using its metadata directly.
+// This is useful when you have metadata from an external source.
+func (r *Reader) ReadBlobByMetadata(meta BlobMetadata) ([]byte, error) {
+       return r.readBlobData(meta)
+}
+
+// readBlobData is the internal implementation for reading blob data.
+func (r *Reader) readBlobData(meta BlobMetadata) ([]byte, error) {
+       // Validate blob type
+       if meta.Type == "" {
+               return nil, errors.New("puffin: cannot read blob: type is 
required")
+       }
+
+       // Check for compressed blob (unsupported)
+       if meta.CompressionCodec != nil && *meta.CompressionCodec != "" {
+               return nil, fmt.Errorf("puffin: cannot read blob: compression 
%q not supported", *meta.CompressionCodec)
+       }
+
+       // Validate offset/length
+       if err := r.validateRange(meta.Offset, meta.Length); err != nil {
+               return nil, fmt.Errorf("puffin: blob: %w", err)
+       }
+
+       // Read blob data
+       data := make([]byte, meta.Length)
+       if _, err := r.r.ReadAt(data, meta.Offset); err != nil {
+               return nil, fmt.Errorf("puffin: read blob data: %w", err)
+       }
+
+       return data, nil
+}
+
+// ReadAllBlobs reads all blobs from the file.
+func (r *Reader) ReadAllBlobs() ([]*BlobData, error) {
+       footer := r.footer
+
+       if len(footer.Blobs) == 0 {
+               return nil, nil
+       }
+
+       // Create index mapping to preserve original order
+       type indexedBlob struct {
+               index int
+               meta  BlobMetadata
+       }
+       indexed := make([]indexedBlob, len(footer.Blobs))
+       for i, meta := range footer.Blobs {
+               indexed[i] = indexedBlob{index: i, meta: meta}
+       }
+
+       // Sort by offset for sequential I/O
+       sort.Slice(indexed, func(i, j int) bool {
+               return indexed[i].meta.Offset < indexed[j].meta.Offset
+       })
+
+       // Read blobs in offset order, store in original order
+       results := make([]*BlobData, len(footer.Blobs))
+       for _, ib := range indexed {
+               data, err := r.readBlobData(ib.meta)
+               if err != nil {
+                       return nil, fmt.Errorf("puffin: read blob %d: %w", 
ib.index, err)
+               }
+               results[ib.index] = &BlobData{Metadata: ib.meta, Data: data}
+       }
+
+       return results, nil
+}
+
+// ReadAt implements io.ReaderAt, reading from the blob data region.
+// It validates that the read range is within the blob data region
+// This is useful for deletion vector use case.
+// offset/length pointing directly into the Puffin file in manifest.
+func (r *Reader) ReadAt(p []byte, off int64) (n int, err error) {
+       if err := r.validateRange(off, int64(len(p))); err != nil {
+               return 0, fmt.Errorf("puffin: %w", err)
+       }
+
+       return r.r.ReadAt(p, off)
+}
+
+// validateRange validates offset and length for reading from the blob data 
region.
+func (r *Reader) validateRange(offset, length int64) error {
+       if length < 0 {
+               return fmt.Errorf("invalid length %d", length)
+       }
+       if length > r.maxBlobSize {
+               return fmt.Errorf("size %d exceeds limit %d", length, 
r.maxBlobSize)
+       }
+       if offset < MagicSize {
+               return fmt.Errorf("invalid offset %d (before header)", offset)
+       }
+
+       end := offset + length
+       if end < offset {
+               return fmt.Errorf("offset+length overflow: offset=%d 
length=%d", offset, length)
+       }
+       if end > r.footerStart {
+               return fmt.Errorf("extends into footer: offset=%d length=%d 
footerStart=%d",
+                       offset, length, r.footerStart)
+       }
+
+       return nil
+}
+
+// validateBlobs validates all blob metadata entries.
+func (r *Reader) validateBlobs(blobs []BlobMetadata, footerStart int64) error {
+       for i, blob := range blobs {
+               // Type is required
+               if blob.Type == "" {
+                       return fmt.Errorf("puffin: blob %d: type is required", 
i)
+               }
+
+               // Length must be non-negative
+               if blob.Length < 0 {
+                       return fmt.Errorf("puffin: blob %d: invalid length %d", 
i, blob.Length)
+               }
+
+               // Offset must be after header magic
+               if blob.Offset < MagicSize {
+                       return fmt.Errorf("puffin: blob %d: offset %d before 
header", i, blob.Offset)
+               }
+
+               // Check for overflow
+               end := blob.Offset + blob.Length
+               if end < blob.Offset {
+                       return fmt.Errorf("puffin: blob %d: offset+length 
overflow: offset=%d length=%d", i, blob.Offset, blob.Length)
+               }
+
+               // Blob must not extend into footer
+               if end > footerStart {
+                       return fmt.Errorf("puffin: blob %d: extends into 
footer: offset=%d length=%d footerStart=%d",
+                               i, blob.Offset, blob.Length, footerStart)
+               }
+       }
+
+       return nil
+}
diff --git a/puffin/puffin_test.go b/puffin/puffin_test.go
new file mode 100644
index 00000000..2752e15f
--- /dev/null
+++ b/puffin/puffin_test.go
@@ -0,0 +1,557 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package puffin_test
+
+import (
+       "bytes"
+       "math"
+       "os"
+       "path"
+       "testing"
+
+       "github.com/apache/iceberg-go/puffin"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// --- Test Helpers ---
+
+func newWriter() (*puffin.Writer, *bytes.Buffer) {
+       buf := &bytes.Buffer{}
+       w, _ := puffin.NewWriter(buf)
+
+       return w, buf
+}
+
+func newReader(t *testing.T, buf *bytes.Buffer) *puffin.Reader {
+       r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()))
+       require.NoError(t, err)
+
+       return r
+}
+
+func defaultBlobInput() puffin.BlobMetadataInput {
+       return puffin.BlobMetadataInput{
+               Type:   puffin.BlobTypeDataSketchesTheta,
+               Fields: []int32{},
+       }
+}
+
+func validFile() []byte {
+       w, buf := newWriter()
+       w.Finish()
+
+       return buf.Bytes()
+}
+
+func validFileWithBlob() []byte {
+       w, buf := newWriter()
+       w.AddBlob(defaultBlobInput(), []byte("test data"))
+       w.Finish()
+
+       return buf.Bytes()
+}
+
+// --- Tests ---
+
+// TestRoundTrip verifies that data written by Writer can be read back by 
Reader.
+// This is the core integration test ensuring the puffin format is correctly 
implemented.
+func TestRoundTrip(t *testing.T) {
+       blob1Data := []byte("theta sketch data here")
+       blob2Data := []byte("another blob with different content")
+
+       w, buf := newWriter()
+       w.AddProperties(map[string]string{"test-property": "test-value"})
+
+       meta1, err := w.AddBlob(puffin.BlobMetadataInput{
+               Type:           puffin.BlobTypeDataSketchesTheta,
+               SnapshotID:     123,
+               SequenceNumber: 1,
+               Fields:         []int32{1, 2, 3},
+               Properties:     map[string]string{"ndv": "1000"},
+       }, blob1Data)
+       require.NoError(t, err)
+
+       meta2, err := w.AddBlob(puffin.BlobMetadataInput{
+               Type:           puffin.BlobTypeDeletionVector,
+               SnapshotID:     -1,
+               SequenceNumber: -1,
+               Fields:         []int32{},
+               Properties:     map[string]string{"referenced-data-file": 
"data/file.parquet"},
+       }, blob2Data)
+       require.NoError(t, err)
+       require.NoError(t, w.Finish())
+
+       r := newReader(t, buf)
+       blobs := r.Blobs()
+       props := r.Properties()
+
+       assert.Len(t, blobs, 2)
+       assert.Equal(t, "test-value", props["test-property"])
+       assert.Contains(t, props[puffin.CreatedBy], "iceberg-go")
+
+       // Verify blob 1
+       assert.Equal(t, puffin.BlobTypeDataSketchesTheta, blobs[0].Type)
+       assert.Equal(t, int64(123), blobs[0].SnapshotID)
+       assert.Equal(t, int64(1), blobs[0].SequenceNumber)
+       assert.Equal(t, []int32{1, 2, 3}, blobs[0].Fields)
+       assert.Equal(t, meta1.Offset, blobs[0].Offset)
+       assert.Equal(t, meta1.Length, blobs[0].Length)
+       assert.Equal(t, "1000", blobs[0].Properties["ndv"])
+
+       // Verify blob 2
+       assert.Equal(t, puffin.BlobTypeDeletionVector, blobs[1].Type)
+       assert.Equal(t, int64(-1), blobs[1].SnapshotID)
+       assert.Equal(t, int64(-1), blobs[1].SequenceNumber)
+       assert.Equal(t, meta2.Offset, blobs[1].Offset)
+       assert.Equal(t, meta2.Length, blobs[1].Length)
+
+       // Verify data
+       blobData1, _ := r.ReadBlob(0)
+       assert.Equal(t, blob1Data, blobData1.Data)
+
+       blobData2, _ := r.ReadBlob(1)
+       assert.Equal(t, blob2Data, blobData2.Data)
+
+       allBlobs, _ := r.ReadAllBlobs()
+       assert.Len(t, allBlobs, 2)
+       assert.Equal(t, blob1Data, allBlobs[0].Data)
+       assert.Equal(t, blob2Data, allBlobs[1].Data)
+}
+
+// TestEmptyFile verifies that a puffin file with no blobs is valid.
+// Empty files are valid per spec and used when no statistics exist yet.
+func TestEmptyFile(t *testing.T) {
+       w, buf := newWriter()
+       require.NoError(t, w.Finish())
+
+       r := newReader(t, buf)
+       assert.Len(t, r.Blobs(), 0)
+       assert.Contains(t, r.Properties()[puffin.CreatedBy], "iceberg-go")
+
+       blobs, err := r.ReadAllBlobs()
+       require.NoError(t, err)
+       assert.Nil(t, blobs)
+}
+
+// TestEmptyBlobData verifies that a zero-length blob is valid.
+// Some blob types may legitimately have empty content.
+func TestEmptyBlobData(t *testing.T) {
+       w, buf := newWriter()
+       meta, err := w.AddBlob(defaultBlobInput(), []byte{})
+       require.NoError(t, err)
+       assert.Equal(t, int64(0), meta.Length)
+       w.Finish()
+
+       r := newReader(t, buf)
+       blob, _ := r.ReadBlob(0)
+       assert.Empty(t, blob.Data)
+}
+
+// TestLargeFooter verifies reading works when footer exceeds initial 8KB read 
buffer.
+// This exercises the code path where footer requires a second read from 
storage.
+func TestLargeFooter(t *testing.T) {
+       w, buf := newWriter()
+       numBlobs := 200
+       for i := range numBlobs {
+               w.AddBlob(puffin.BlobMetadataInput{
+                       Type:       puffin.BlobTypeDataSketchesTheta,
+                       SnapshotID: int64(i),
+                       Fields:     []int32{1, 2, 3, 4, 5},
+                       Properties: map[string]string{"key": 
"value-to-increase-footer-size"},
+               }, []byte("blob"))
+       }
+       w.Finish()
+
+       r := newReader(t, buf)
+       assert.Len(t, r.Blobs(), numBlobs)
+}
+
+// TestWriterValidation verifies that Writer rejects invalid input.
+// Proper validation prevents corrupt files and provides clear error messages.
+func TestWriterValidation(t *testing.T) {
+       // nil writer: Ensures graceful failure when no underlying writer is 
provided.
+       t.Run("nil writer", func(t *testing.T) {
+               _, err := puffin.NewWriter(nil)
+               assert.ErrorContains(t, err, "nil")
+       })
+
+       // missing type: Blob type is required per spec to identify the blob 
format.
+       t.Run("missing type", func(t *testing.T) {
+               w, _ := newWriter()
+               _, err := w.AddBlob(puffin.BlobMetadataInput{Type: "", Fields: 
[]int32{}}, []byte("x"))
+               assert.ErrorContains(t, err, "type")
+       })
+
+       // nil fields: Fields slice must be non-nil per spec (empty slice is 
valid).
+       t.Run("nil fields", func(t *testing.T) {
+               w, _ := newWriter()
+               _, err := w.AddBlob(puffin.BlobMetadataInput{Type: 
puffin.BlobTypeDataSketchesTheta, Fields: nil}, []byte("x"))
+               assert.ErrorContains(t, err, "fields")
+       })
+
+       // add blob after finish: Enforces writer state machine - no writes 
after finalization.
+       t.Run("add blob after finish", func(t *testing.T) {
+               w, _ := newWriter()
+               w.Finish()
+               _, err := w.AddBlob(defaultBlobInput(), []byte("x"))
+               assert.ErrorContains(t, err, "finalized")
+       })
+
+       // double finish: Prevents accidental double-write of footer corrupting 
the file.
+       t.Run("double finish", func(t *testing.T) {
+               w, _ := newWriter()
+               w.Finish()
+               assert.ErrorContains(t, w.Finish(), "finalized")
+       })
+
+       // deletion vector invalid snapshot-id: Spec requires snapshot-id=-1 
for deletion vectors.
+       t.Run("deletion vector invalid snapshot-id", func(t *testing.T) {
+               w, _ := newWriter()
+               _, err := w.AddBlob(puffin.BlobMetadataInput{
+                       Type: puffin.BlobTypeDeletionVector, SnapshotID: 123, 
SequenceNumber: -1, Fields: []int32{},
+               }, []byte("x"))
+               assert.ErrorContains(t, err, "snapshot-id")
+       })
+
+       // deletion vector invalid sequence-number: Spec requires 
sequence-number=-1 for deletion vectors.
+       t.Run("deletion vector invalid sequence-number", func(t *testing.T) {
+               w, _ := newWriter()
+               _, err := w.AddBlob(puffin.BlobMetadataInput{
+                       Type: puffin.BlobTypeDeletionVector, SnapshotID: -1, 
SequenceNumber: 5, Fields: []int32{},
+               }, []byte("x"))
+               assert.ErrorContains(t, err, "sequence-number")
+       })
+}
+
+// TestSetCreatedBy verifies the SetCreatedBy method.
+// Allows applications to identify themselves in puffin files for debugging.
+func TestSetCreatedBy(t *testing.T) {
+       // custom value: Verifies custom application identifiers are preserved 
in footer.
+       t.Run("custom value", func(t *testing.T) {
+               w, buf := newWriter()
+               require.NoError(t, w.SetCreatedBy("MyApp 1.0"))
+               w.Finish()
+               r := newReader(t, buf)
+               assert.Equal(t, "MyApp 1.0", r.Properties()[puffin.CreatedBy])
+       })
+
+       // empty rejected: Empty identifier provides no value and likely 
indicates a bug.
+       t.Run("empty rejected", func(t *testing.T) {
+               w, _ := newWriter()
+               assert.ErrorContains(t, w.SetCreatedBy(""), "empty")
+       })
+
+       // after finish rejected: Enforces writer state machine consistency.
+       t.Run("after finish rejected", func(t *testing.T) {
+               w, _ := newWriter()
+               w.Finish()
+               assert.ErrorContains(t, w.SetCreatedBy("x"), "finalized")
+       })
+}
+
+// TestClearProperties verifies the ClearProperties method.
+// Allows resetting properties before finishing if initial values were wrong.
+func TestClearProperties(t *testing.T) {
+       w, buf := newWriter()
+       w.AddProperties(map[string]string{"key": "value"})
+       w.ClearProperties()
+       w.Finish()
+
+       r := newReader(t, buf)
+       _, exists := r.Properties()["key"]
+       assert.False(t, exists)
+}
+
+// TestAddPropertiesAfterFinish verifies AddProperties rejects calls after 
finish.
+// Enforces writer state machine - properties cannot be added after footer is 
written.
+func TestAddPropertiesAfterFinish(t *testing.T) {
+       w, _ := newWriter()
+       w.Finish()
+       assert.ErrorContains(t, w.AddProperties(map[string]string{"k": "v"}), 
"finalized")
+}
+
+// TestReaderInvalidFile verifies that Reader rejects invalid/corrupt files.
+// Early detection of corruption prevents silent data loss or security issues.
+func TestReaderInvalidFile(t *testing.T) {
+       tests := []struct {
+               name    string
+               data    func() []byte
+               wantErr string
+       }{
+               // file too small: Minimum valid puffin file has header magic + 
footer, rejects truncated files.
+               {"file too small", func() []byte { return []byte("tiny") }, 
"too small"},
+               // invalid header magic: First 4 bytes must be 'PFA1' to 
identify puffin format.
+               {"invalid header magic", func() []byte {
+                       d := validFile()
+                       d[0] = 'X'
+
+                       return d
+               }, "magic"},
+               // invalid trailing magic: Last 4 bytes must be 'PFA1' to 
detect truncation.
+               {"invalid trailing magic", func() []byte {
+                       d := validFile()
+                       d[len(d)-1] = 'X'
+
+                       return d
+               }, "magic"},
+               // invalid footer start magic: Footer section must start with 
'PFA1' for integrity.
+               {"invalid footer start magic", func() []byte {
+                       d := validFile()
+                       d[4] = 'X'
+
+                       return d
+               }, "magic"},
+               // unknown flags: Reject files with unsupported features to 
avoid misinterpretation.
+               {"unknown flags", func() []byte {
+                       d := validFile()
+                       d[len(d)-8] = 0x80
+
+                       return d
+               }, "unknown"},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       data := tt.data()
+                       _, err := puffin.NewReader(bytes.NewReader(data))
+                       assert.ErrorContains(t, err, tt.wantErr)
+               })
+       }
+
+       // nil reader: Ensures graceful failure when no underlying reader is 
provided.
+       t.Run("nil reader", func(t *testing.T) {
+               _, err := puffin.NewReader(nil)
+               assert.ErrorContains(t, err, "nil")
+       })
+}
+
+// TestReaderBlobAccess verifies blob access methods work correctly.
+// Tests the primary API for retrieving blob data from puffin files.
+func TestReaderBlobAccess(t *testing.T) {
+       w, buf := newWriter()
+       blobs := [][]byte{[]byte("first"), []byte("second"), []byte("third")}
+       for _, b := range blobs {
+               w.AddBlob(defaultBlobInput(), b)
+       }
+       w.Finish()
+       r := newReader(t, buf)
+
+       // read by index: Primary access method for retrieving blobs 
sequentially.
+       t.Run("read by index", func(t *testing.T) {
+               for i, expected := range blobs {
+                       blob, _ := r.ReadBlob(i)
+                       assert.Equal(t, expected, blob.Data)
+               }
+       })
+
+       // index out of range: Prevents panic and provides clear error for 
invalid indices.
+       t.Run("index out of range", func(t *testing.T) {
+               _, err := r.ReadBlob(-1)
+               assert.Error(t, err)
+               _, err = r.ReadBlob(100)
+               assert.Error(t, err)
+       })
+
+       // read by metadata: Allows direct access when caller has metadata from 
external source.
+       t.Run("read by metadata", func(t *testing.T) {
+               data, _ := r.ReadBlobByMetadata(r.Blobs()[1])
+               assert.Equal(t, blobs[1], data)
+       })
+
+       // read all preserves order: Ensures blobs are returned in same order 
as written.
+       t.Run("read all preserves order", func(t *testing.T) {
+               all, _ := r.ReadAllBlobs()
+               for i, expected := range blobs {
+                       assert.Equal(t, expected, all[i].Data)
+               }
+       })
+}
+
+// TestReadBlobByMetadataValidation verifies validation of blob metadata.
+// Prevents reading garbage data when metadata is corrupted or crafted 
maliciously.
+func TestReadBlobByMetadataValidation(t *testing.T) {
+       data := validFileWithBlob()
+       r, _ := puffin.NewReader(bytes.NewReader(data))
+
+       tests := []struct {
+               name    string
+               meta    puffin.BlobMetadata
+               wantErr string
+       }{
+               // empty type: Type is required to interpret blob content 
correctly.
+               {"empty type", puffin.BlobMetadata{Type: "", Offset: 4, Length: 
1}, "type"},
+               // offset before header: Prevents reading magic bytes as blob 
data.
+               {"offset before header", puffin.BlobMetadata{Type: "t", Offset: 
0, Length: 1}, "header"},
+               // negative length: Invalid length could cause allocation 
issues.
+               {"negative length", puffin.BlobMetadata{Type: "t", Offset: 4, 
Length: -1}, "length"},
+               // extends into footer: Prevents reading footer JSON as blob 
data.
+               {"extends into footer", puffin.BlobMetadata{Type: "t", Offset: 
4, Length: 9999}, "footer"},
+               // overflow: Prevents integer overflow attacks in offset+length 
calculation.
+               {"overflow", puffin.BlobMetadata{Type: "t", Offset: 
math.MaxInt64, Length: 1}, "overflow"},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       _, err := r.ReadBlobByMetadata(tt.meta)
+                       assert.ErrorContains(t, err, tt.wantErr)
+               })
+       }
+}
+
+// TestReadAt verifies the ReadAt method (io.ReaderAt implementation).
+// Enables partial reads for streaming or when only specific byte ranges are 
needed.
+func TestReadAt(t *testing.T) {
+       w, buf := newWriter()
+       meta, _ := w.AddBlob(defaultBlobInput(), []byte("hello world"))
+       w.Finish()
+       r := newReader(t, buf)
+
+       // valid range: Verifies partial blob reads work correctly.
+       t.Run("valid range", func(t *testing.T) {
+               data := make([]byte, 5)
+               n, err := r.ReadAt(data, meta.Offset)
+               require.NoError(t, err)
+               assert.Equal(t, 5, n)
+               assert.Equal(t, []byte("hello"), data)
+       })
+
+       // extends into footer: Prevents reading footer metadata as blob 
content.
+       t.Run("extends into footer", func(t *testing.T) {
+               data := make([]byte, buf.Len())
+               _, err := r.ReadAt(data, 4)
+               assert.ErrorContains(t, err, "footer")
+       })
+
+       // offset before header: Prevents reading file magic as blob content.
+       t.Run("offset before header", func(t *testing.T) {
+               data := make([]byte, 4)
+               _, err := r.ReadAt(data, 0)
+               assert.ErrorContains(t, err, "header")
+       })
+}
+
+// Integration tests against canonical Java-generated fixtures.
+// Source: 
https://github.com/apache/iceberg/tree/main/core/src/test/resources/org/apache/iceberg/puffin/v1
+
+func readFixture(t *testing.T, name string) []byte {
+       t.Helper()
+       data, err := os.ReadFile(path.Join("testdata", name))
+       require.NoError(t, err)
+
+       return data
+}
+
+// TestReadJavaEmptyFile reads the canonical empty puffin file generated by
+// the Java implementation and verifies our reader parses it correctly.
+func TestReadJavaEmptyFile(t *testing.T) {
+       data := readFixture(t, "empty-puffin-uncompressed.bin")
+       r, err := puffin.NewReader(bytes.NewReader(data))
+       require.NoError(t, err)
+
+       assert.Empty(t, r.Blobs())
+       assert.Empty(t, r.Properties())
+}
+
+// TestReadJavaUncompressedMetricData reads the uncompressed metric data file
+// from the Java implementation and verifies all metadata and blob
+// contents match the expected values exactly.
+func TestReadJavaUncompressedMetricData(t *testing.T) {
+       data := readFixture(t, "sample-metric-data-uncompressed.bin")
+       r, err := puffin.NewReader(bytes.NewReader(data))
+       require.NoError(t, err)
+
+       // File-level properties
+       assert.Equal(t, "Test 1234", r.Properties()[puffin.CreatedBy])
+
+       // Should have exactly 2 blobs
+       blobs := r.Blobs()
+       require.Len(t, blobs, 2)
+
+       // Blob 0: type="some-blob"
+       assert.Equal(t, puffin.BlobType("some-blob"), blobs[0].Type)
+       assert.Equal(t, []int32{1}, blobs[0].Fields)
+       assert.Equal(t, int64(2), blobs[0].SnapshotID)
+       assert.Equal(t, int64(1), blobs[0].SequenceNumber)
+       assert.Equal(t, int64(4), blobs[0].Offset)
+       assert.Equal(t, int64(9), blobs[0].Length)
+       assert.Nil(t, blobs[0].CompressionCodec)
+
+       blob0, err := r.ReadBlob(0)
+       require.NoError(t, err)
+       assert.Equal(t, []byte("abcdefghi"), blob0.Data)
+
+       // Blob 1: type="some-other-blob", contains NUL byte and emoji
+       assert.Equal(t, puffin.BlobType("some-other-blob"), blobs[1].Type)
+       assert.Equal(t, []int32{2}, blobs[1].Fields)
+       assert.Equal(t, int64(2), blobs[1].SnapshotID)
+       assert.Equal(t, int64(1), blobs[1].SequenceNumber)
+       assert.Equal(t, int64(13), blobs[1].Offset)
+       assert.Equal(t, int64(83), blobs[1].Length)
+       assert.Nil(t, blobs[1].CompressionCodec)
+
+       blob1, err := r.ReadBlob(1)
+       require.NoError(t, err)
+       expectedBlob1 := []byte("some blob \x00 binary data \xf0\x9f\xa4\xaf 
that is not very very very very very very long, is it?")
+       assert.Equal(t, expectedBlob1, blob1.Data)
+       assert.Len(t, blob1.Data, 83)
+}
+
+// TestReadJavaCompressedMetricData verifies our reader correctly rejects
+// the zstd-compressed fixture since compression is not yet supported.
+func TestReadJavaCompressedMetricData(t *testing.T) {
+       data := readFixture(t, "sample-metric-data-compressed-zstd.bin")
+       r, err := puffin.NewReader(bytes.NewReader(data))
+       require.NoError(t, err)
+
+       // Metadata should be readable (compression is per-blob, not footer)
+       blobs := r.Blobs()
+       require.Len(t, blobs, 2)
+
+       // But reading blob data should fail due to compression
+       _, err = r.ReadBlob(0)
+       assert.ErrorContains(t, err, "compression")
+}
+
+// TestWriterBitIdenticalWithJava verifies that our writer produces
+// byte-identical output to the Java implementation's canonical fixture.
+func TestWriterBitIdenticalWithJava(t *testing.T) {
+       expected := readFixture(t, "sample-metric-data-uncompressed.bin")
+
+       w, buf := newWriter()
+       require.NoError(t, w.SetCreatedBy("Test 1234"))
+
+       _, err := w.AddBlob(puffin.BlobMetadataInput{
+               Type:           "some-blob",
+               Fields:         []int32{1},
+               SnapshotID:     2,
+               SequenceNumber: 1,
+       }, []byte("abcdefghi"))
+       require.NoError(t, err)
+
+       _, err = w.AddBlob(puffin.BlobMetadataInput{
+               Type:           "some-other-blob",
+               Fields:         []int32{2},
+               SnapshotID:     2,
+               SequenceNumber: 1,
+       }, []byte("some blob \x00 binary data \xf0\x9f\xa4\xaf that is not very 
very very very very very long, is it?"))
+       require.NoError(t, err)
+
+       require.NoError(t, w.Finish())
+       assert.Equal(t, expected, buf.Bytes())
+}
diff --git a/puffin/puffin_writer.go b/puffin/puffin_writer.go
new file mode 100644
index 00000000..fd0a39dd
--- /dev/null
+++ b/puffin/puffin_writer.go
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package puffin
+
+import (
+       "encoding/binary"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io"
+       "math"
+
+       "github.com/apache/iceberg-go"
+)
+
+// Writer writes blobs and metadata to a Puffin file.
+//
+// Usage:
+//
+//     w, err := puffin.NewWriter(file)
+//     if err != nil {
+//         return err
+//     }
+//     _, err = w.AddBlob(puffin.BlobMetadataInput{
+//         Type:       puffin.BlobTypeDataSketchesTheta,
+//         SnapshotID: 123,
+//         Fields:     []int32{1},
+//     }, sketchBytes)
+//     if err != nil {
+//         return err
+//     }
+//     return w.Finish()
+type Writer struct {
+       w         io.Writer
+       offset    int64
+       blobs     []BlobMetadata
+       props     map[string]string
+       done      bool
+       createdBy string
+}
+
+// BlobMetadataInput contains fields the caller provides when adding a blob.
+// Offset, Length, and CompressionCodec are set by the writer.
+type BlobMetadataInput struct {
+       Type           BlobType
+       SnapshotID     int64
+       SequenceNumber int64
+       Fields         []int32
+       Properties     map[string]string
+}
+
+// NewWriter creates a new Writer and writes the file header magic.
+// The caller is responsible for closing the underlying writer after Finish 
returns.
+func NewWriter(w io.Writer) (*Writer, error) {
+       if w == nil {
+               return nil, errors.New("puffin: writer is nil")
+       }
+
+       // Write header magic bytes
+       if err := writeAll(w, magic[:]); err != nil {
+               return nil, fmt.Errorf("puffin: write header magic: %w", err)
+       }
+
+       return &Writer{
+               w:         w,
+               offset:    MagicSize,
+               props:     make(map[string]string),
+               createdBy: "iceberg-go " + iceberg.Version(),
+       }, nil
+}
+
+// SetProperties merges the provided properties into the file-level properties
+// written to the footer. Can be called multiple times before Finish.
+func (w *Writer) AddProperties(props map[string]string) error {
+       if w.done {
+               return errors.New("puffin: cannot set properties: writer 
already finalized")
+       }
+       for k, v := range props {
+               w.props[k] = v
+       }
+
+       return nil
+}
+
+// clear properties
+func (w *Writer) ClearProperties() {
+       w.props = make(map[string]string)
+}
+
+// SetCreatedBy overrides the default "created-by" property written to the 
footer.
+// The default value is "iceberg-go". Example: "MyApp version 1.2.3".
+func (w *Writer) SetCreatedBy(createdBy string) error {
+       if w.done {
+               return errors.New("puffin: cannot set created-by: writer 
already finalized")
+       }
+       if createdBy == "" {
+               return errors.New("puffin: cannot set created-by: value cannot 
be empty")
+       }
+       w.createdBy = createdBy
+
+       return nil
+}
+
+// AddBlob writes blob data and records its metadata for the footer.
+// Returns the complete BlobMetadata including the computed Offset and Length.
+// The input.Type is required; use constants like ApacheDataSketchesThetaV1.
+func (w *Writer) AddBlob(input BlobMetadataInput, data []byte) (BlobMetadata, 
error) {
+       if w.done {
+               return BlobMetadata{}, errors.New("puffin: cannot add blob: 
writer already finalized")
+       }
+       if input.Type == "" {
+               return BlobMetadata{}, errors.New("puffin: cannot add blob: 
type is required")
+       }
+       if input.Fields == nil {
+               return BlobMetadata{}, errors.New("puffin: cannot add blob: 
fields is required")
+       }
+
+       // Deletion vectors have special requirements per spec
+       if input.Type == BlobTypeDeletionVector {
+               if input.SnapshotID != -1 {
+                       return BlobMetadata{}, errors.New("puffin: 
deletion-vector-v1 requires snapshot-id to be -1")
+               }
+               if input.SequenceNumber != -1 {
+                       return BlobMetadata{}, errors.New("puffin: 
deletion-vector-v1 requires sequence-number to be -1")
+               }
+       }
+
+       meta := BlobMetadata{
+               Type:           input.Type,
+               SnapshotID:     input.SnapshotID,
+               SequenceNumber: input.SequenceNumber,
+               Fields:         input.Fields,
+               Offset:         w.offset,
+               Length:         int64(len(data)),
+               Properties:     input.Properties,
+       }
+
+       if err := writeAll(w.w, data); err != nil {
+               return BlobMetadata{}, fmt.Errorf("puffin: write blob: %w", err)
+       }
+
+       w.offset += meta.Length
+       w.blobs = append(w.blobs, meta)
+
+       return meta, nil
+}
+
+// Finish writes the footer and completes the Puffin file structure.
+// Must be called exactly once after all blobs are written.
+// After Finish returns, no further operations are allowed on the writer.
+func (w *Writer) Finish() error {
+       if w.done {
+               return errors.New("puffin: cannot finish: writer already 
finalized")
+       }
+
+       // Build footer
+       blobs := w.blobs
+       if blobs == nil {
+               blobs = []BlobMetadata{}
+       }
+
+       // Only include properties in the footer if there are any to write.
+       // Add created-by first, then decide whether to include properties.
+       if w.createdBy != "" {
+               w.props[CreatedBy] = w.createdBy
+       }
+       var props map[string]string
+       if len(w.props) > 0 {
+               props = w.props
+       }
+
+       footer := Footer{
+               Blobs:      blobs,
+               Properties: props,
+       }
+
+       payload, err := json.Marshal(footer)
+       if err != nil {
+               return fmt.Errorf("puffin: marshal footer: %w", err)
+       }
+
+       // Check footer size fits in int32
+       if len(payload) > math.MaxInt32 {
+               return fmt.Errorf("puffin: footer too large: %d bytes exceeds 
2GB limit", len(payload))
+       }
+
+       // Write footer start magic
+       if err := writeAll(w.w, magic[:]); err != nil {
+               return fmt.Errorf("puffin: write footer magic: %w", err)
+       }
+
+       // Write footer payload
+       if err := writeAll(w.w, payload); err != nil {
+               return fmt.Errorf("puffin: write footer payload: %w", err)
+       }
+
+       // Write trailer: PayloadSize(4) + Flags(4) + Magic(4)
+       var trailer [footerTrailerSize]byte
+       binary.LittleEndian.PutUint32(trailer[0:4], uint32(len(payload)))
+       binary.LittleEndian.PutUint32(trailer[4:8], 0) // flags = 0 
(uncompressed)
+       copy(trailer[8:12], magic[:])
+
+       if err := writeAll(w.w, trailer[:]); err != nil {
+               return fmt.Errorf("puffin: write footer trailer: %w", err)
+       }
+
+       w.done = true
+
+       return nil
+}
+
+// writeAll writes all bytes to w or returns an error.
+// Handles the io.Writer contract where Write can return n < len(data) without 
error.
+func writeAll(w io.Writer, data []byte) error {
+       n, err := w.Write(data)
+       if err != nil {
+               return err
+       }
+       if n != len(data) {
+               return fmt.Errorf("short write: wrote %d of %d bytes", n, 
len(data))
+       }
+
+       return nil
+}
diff --git a/puffin/testdata/README.md b/puffin/testdata/README.md
new file mode 100644
index 00000000..084df93c
--- /dev/null
+++ b/puffin/testdata/README.md
@@ -0,0 +1,21 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+These test fixture files are canonical Puffin files from the Apache Iceberg 
Java implementation:
+https://github.com/apache/iceberg/tree/main/core/src/test/resources/org/apache/iceberg/puffin/v1
diff --git a/puffin/testdata/empty-puffin-uncompressed.bin 
b/puffin/testdata/empty-puffin-uncompressed.bin
new file mode 100644
index 00000000..142b45bd
Binary files /dev/null and b/puffin/testdata/empty-puffin-uncompressed.bin 
differ
diff --git a/puffin/testdata/sample-metric-data-compressed-zstd.bin 
b/puffin/testdata/sample-metric-data-compressed-zstd.bin
new file mode 100644
index 00000000..ac8b69c7
Binary files /dev/null and 
b/puffin/testdata/sample-metric-data-compressed-zstd.bin differ
diff --git a/puffin/testdata/sample-metric-data-uncompressed.bin 
b/puffin/testdata/sample-metric-data-uncompressed.bin
new file mode 100644
index 00000000..ab8da138
Binary files /dev/null and 
b/puffin/testdata/sample-metric-data-uncompressed.bin differ


Reply via email to