zeroshade commented on code in PR #866:
URL: https://github.com/apache/iceberg-go/pull/866#discussion_r3082217171


##########
table/dv/deletion_vector.go:
##########
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package dv
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "hash/crc32"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/puffin"
+)
+
+const (
+       // DVMagicNumber is the magic number for deletion vectors.
+       // Spec bytes: D1 D3 39 64 (big-endian) = 0x6439D3D1 (little-endian 
uint32)
+       DVMagicNumber uint32 = 0x6439D3D1

Review Comment:
   Will defining the constant this way work for big-endian architectures or do 
we need to add a conditional compilation etc? Can we just use the actual uint32 
value to avoid endian differences?



##########
table/dv/deletion_vector.go:
##########
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package dv
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "hash/crc32"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/puffin"
+)
+
+const (
+       // DVMagicNumber is the magic number for deletion vectors.
+       // Spec bytes: D1 D3 39 64 (big-endian) = 0x6439D3D1 (little-endian 
uint32)
+       DVMagicNumber uint32 = 0x6439D3D1
+
+       dvLengthSize = 4 // length field
+       dvMagicSize  = 4 // magic field
+       dvCRCSize    = 4 // CRC-32 checksum
+       dvMinSize    = dvLengthSize + dvMagicSize + dvCRCSize
+)
+
+// DeserializeDV parses a deletion vector blob and returns a bitmap of deleted 
positions.
+//
+// The DV binary format is:
+//   - Length (4 bytes, big-endian): size of magic + bitmap data
+//   - Magic  (4 bytes, little-endian): must be 0x6439D3D1
+//   - Bitmap (variable): roaring bitmap in Iceberg portable format
+//   - CRC-32 (4 bytes, big-endian): checksum over magic + bitmap
+//
+// If expectedCardinality >= 0, the bitmap's cardinality is validated against 
it.
+func DeserializeDV(data []byte, expectedCardinality int64) 
(*RoaringPositionBitmap, error) {
+       if len(data) < dvMinSize {
+               return nil, fmt.Errorf("deletion vector payload too short: %d 
bytes (minimum %d)", len(data), dvMinSize)
+       }
+
+       // 1. Read and validate length
+       length := binary.BigEndian.Uint32(data[0:dvLengthSize])
+       expectedLength := uint32(len(data) - dvLengthSize - dvCRCSize)
+       if length != expectedLength {
+               return nil, fmt.Errorf("deletion vector length mismatch: got 
%d, expected %d", length, expectedLength)
+       }
+
+       // 2. Read and validate magic
+       magic := binary.LittleEndian.Uint32(data[dvLengthSize : 
dvLengthSize+dvMagicSize])
+       if magic != DVMagicNumber {
+               return nil, fmt.Errorf("invalid deletion vector magic: 0x%08x, 
expected 0x%08x", magic, DVMagicNumber)
+       }
+
+       // 3. Verify CRC-32 over magic + bitmap (bytes 4 to len-4)
+       bitmapDataStart := dvLengthSize
+       bitmapDataEnd := len(data) - dvCRCSize
+       computedCRC := crc32.ChecksumIEEE(data[bitmapDataStart:bitmapDataEnd])
+       expectedCRC := binary.BigEndian.Uint32(data[bitmapDataEnd:])
+       if computedCRC != expectedCRC {
+               return nil, fmt.Errorf("deletion vector CRC mismatch: computed 
0x%08x, expected 0x%08x", computedCRC, expectedCRC)
+       }
+
+       // 4. Deserialize roaring bitmap from the inner bytes (after length + 
magic, before CRC)
+       roaringStart := dvLengthSize + dvMagicSize
+       bitmap, err := 
DeserializeRoaringPositionBitmap(bytes.NewReader(data[roaringStart:bitmapDataEnd]))
+       if err != nil {
+               return nil, fmt.Errorf("deserialize deletion vector bitmap: 
%w", err)
+       }
+
+       // 5. Validate cardinality if requested
+       if expectedCardinality >= 0 {
+               actual := bitmap.Cardinality()
+               if actual != expectedCardinality {
+                       return nil, fmt.Errorf("deletion vector cardinality 
mismatch: got %d, expected %d", actual, expectedCardinality)
+               }
+       }
+
+       return bitmap, nil
+}
+
+// ReadDV reads a deletion vector from a puffin file using the manifest entry 
metadata.
+// ContentOffset and ContentSizeInBytes must be set on the DataFile (required 
by v3 spec).
+func ReadDV(fs iceio.IO, dvFile iceberg.DataFile) (*RoaringPositionBitmap, 
error) {
+       if dvFile.FileFormat() != iceberg.PuffinFile {
+               return nil, fmt.Errorf("expected PUFFIN format for deletion 
vector, got %s", dvFile.FileFormat())
+       }
+
+       if dvFile.ContentOffset() == nil || dvFile.ContentSizeInBytes() == nil {
+               return nil, fmt.Errorf("DV file %s missing 
ContentOffset/ContentSizeInBytes", dvFile.FilePath())
+       }
+
+       size := *dvFile.ContentSizeInBytes()
+       if size < 0 || size > int64(puffin.DefaultMaxBlobSize) {
+               return nil, fmt.Errorf("DV blob size %d out of valid range [0, 
%d]", size, puffin.DefaultMaxBlobSize)
+       }
+
+       f, err := fs.Open(dvFile.FilePath())
+       if err != nil {
+               return nil, fmt.Errorf("open DV file %s: %w", 
dvFile.FilePath(), err)
+       }
+       defer f.Close()
+
+       reader, err := puffin.NewReader(f)

Review Comment:
   does the puffin reader need a `defer reader.Close()`?



##########
table/dv/roaring_bitmap.go:
##########
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package dv
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+       "slices"
+       "sort"
+
+       "github.com/RoaringBitmap/roaring/v2"
+       "github.com/apache/iceberg-go/puffin"
+)
+
+// maxBitmapCount is the maximum number of 32-bit bitmap keys allowed during
+// deserialization. This prevents CPU/memory exhaustion from absurd counts
+// in malformed input. Derived from puffin.DefaultMaxBlobSize / 8 (minimum
+// per-bitmap overhead: 4-byte key + at least 4 bytes of roaring data).
+var maxBitmapCount = int64(puffin.DefaultMaxBlobSize / 8)
+
+// RoaringPositionBitmap supports 64-bit positions using a sparse map of
+// 32-bit Roaring bitmaps. Positions are split into a 32-bit key
+// (high bits) and 32-bit value (low bits).
+//
+// Compatible with the Java Iceberg RoaringPositionBitmap serialization format.
+type RoaringPositionBitmap struct {
+       bitmaps map[uint32]*roaring.Bitmap
+}
+
+// NewRoaringPositionBitmap creates an empty bitmap.
+func NewRoaringPositionBitmap() *RoaringPositionBitmap {
+       return &RoaringPositionBitmap{
+               bitmaps: make(map[uint32]*roaring.Bitmap),
+       }
+}
+
+// Set marks a position in the bitmap.
+// Position must be non-negative.
+func (b *RoaringPositionBitmap) Set(pos int64) {
+       if pos < 0 {
+               return
+       }
+       key := uint32(pos >> 32)
+       low := uint32(pos)
+       bm, ok := b.bitmaps[key]
+       if !ok {
+               bm = roaring.New()
+               b.bitmaps[key] = bm
+       }
+       bm.Add(low)
+}
+
+// Contains checks if a position is set.
+func (b *RoaringPositionBitmap) Contains(pos int64) bool {
+       if pos < 0 {
+               return false
+       }
+       key := uint32(pos >> 32)
+       low := uint32(pos)
+       bm, ok := b.bitmaps[key]
+       if !ok {
+               return false
+       }
+
+       return bm.Contains(low)
+}
+
+// IsEmpty returns true if no positions are set.
+func (b *RoaringPositionBitmap) IsEmpty() bool {
+       return b.Cardinality() == 0
+}
+
+// Cardinality returns the total number of set positions.
+func (b *RoaringPositionBitmap) Cardinality() int64 {
+       var c int64
+       for _, bm := range b.bitmaps {
+               c += int64(bm.GetCardinality())
+       }
+
+       return c
+}
+
+// Serialize writes in the Iceberg portable format (little-endian):
+//   - bitmap count (8 bytes, LE): number of non-empty bitmaps
+//   - for each bitmap in ascending key order: key (4 bytes, LE) + roaring 
portable data
+//
+// Only non-empty bitmaps are written, matching Java Iceberg behavior.
+func (b *RoaringPositionBitmap) Serialize(w io.Writer) error {
+       keys := make([]uint32, 0, len(b.bitmaps))
+       for k, bm := range b.bitmaps {
+               if bm.GetCardinality() > 0 {
+                       keys = append(keys, k)
+               }
+       }
+       sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
+
+       if err := binary.Write(w, binary.LittleEndian, int64(len(keys))); err 
!= nil {
+               return fmt.Errorf("write bitmap count: %w", err)
+       }
+       for _, key := range keys {
+               if err := binary.Write(w, binary.LittleEndian, key); err != nil 
{
+                       return fmt.Errorf("write key %d: %w", key, err)
+               }
+               if _, err := b.bitmaps[key].WriteTo(w); err != nil {
+                       return fmt.Errorf("write bitmap %d: %w", key, err)
+               }
+       }
+
+       return nil
+}
+
+// DeserializeRoaringPositionBitmap reads a bitmap from the Iceberg portable 
format.
+// Format: [count] { [key][bitmap] } .....{[key_n][bitmap_n]}
+func DeserializeRoaringPositionBitmap(r io.Reader) (*RoaringPositionBitmap, 
error) {
+       var count int64
+       if err := binary.Read(r, binary.LittleEndian, &count); err != nil {
+               return nil, fmt.Errorf("read bitmap count: %w", err)
+       }
+       if count < 0 {
+               return nil, fmt.Errorf("invalid bitmap count: %d", count)
+       }
+       if count > maxBitmapCount {
+               return nil, fmt.Errorf("bitmap count %d exceeds maximum allowed 
%d", count, maxBitmapCount)
+       }
+
+       b := &RoaringPositionBitmap{
+               bitmaps: make(map[uint32]*roaring.Bitmap, count),
+       }
+       var lastKey uint32
+       hasLastKey := false
+
+       for i := int64(0); i < count; i++ {
+               var key uint32
+               if err := binary.Read(r, binary.LittleEndian, &key); err != nil 
{
+                       return nil, fmt.Errorf("read key %d: %w", i, err)
+               }
+               if hasLastKey && key <= lastKey {
+                       return nil, fmt.Errorf("keys must be ascending: got %d 
after %d", key, lastKey)
+               }
+
+               bm := roaring.New()
+               if _, err := bm.ReadFrom(r); err != nil {
+                       return nil, fmt.Errorf("read bitmap for key %d: %w", 
key, err)
+               }
+               b.bitmaps[key] = bm
+               lastKey = key
+               hasLastKey = true
+       }
+
+       return b, nil
+}
+
+// sortedKeys returns the bitmap keys in ascending order.
+func (b *RoaringPositionBitmap) sortedKeys() []uint32 {
+       keys := make([]uint32, 0, len(b.bitmaps))
+       for k := range b.bitmaps {
+               keys = append(keys, k)
+       }
+       slices.Sort(keys)
+
+       return keys

Review Comment:
   ```suggestion
       return slices.Sorted(maps.Keys(b.bitmaps))
   ```



##########
table/dv/roaring_bitmap.go:
##########
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package dv
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+       "slices"
+       "sort"
+
+       "github.com/RoaringBitmap/roaring/v2"
+       "github.com/apache/iceberg-go/puffin"
+)
+
+// maxBitmapCount is the maximum number of 32-bit bitmap keys allowed during
+// deserialization. This prevents CPU/memory exhaustion from absurd counts
+// in malformed input. Derived from puffin.DefaultMaxBlobSize / 8 (minimum
+// per-bitmap overhead: 4-byte key + at least 4 bytes of roaring data).
+var maxBitmapCount = int64(puffin.DefaultMaxBlobSize / 8)
+
+// RoaringPositionBitmap supports 64-bit positions using a sparse map of
+// 32-bit Roaring bitmaps. Positions are split into a 32-bit key
+// (high bits) and 32-bit value (low bits).
+//
+// Compatible with the Java Iceberg RoaringPositionBitmap serialization format.
+type RoaringPositionBitmap struct {
+       bitmaps map[uint32]*roaring.Bitmap
+}
+
+// NewRoaringPositionBitmap creates an empty bitmap.
+func NewRoaringPositionBitmap() *RoaringPositionBitmap {
+       return &RoaringPositionBitmap{
+               bitmaps: make(map[uint32]*roaring.Bitmap),
+       }
+}
+
+// Set marks a position in the bitmap.
+// Position must be non-negative.
+func (b *RoaringPositionBitmap) Set(pos int64) {
+       if pos < 0 {
+               return
+       }
+       key := uint32(pos >> 32)
+       low := uint32(pos)
+       bm, ok := b.bitmaps[key]
+       if !ok {
+               bm = roaring.New()
+               b.bitmaps[key] = bm
+       }
+       bm.Add(low)
+}
+
+// Contains checks if a position is set.
+func (b *RoaringPositionBitmap) Contains(pos int64) bool {
+       if pos < 0 {
+               return false
+       }
+       key := uint32(pos >> 32)
+       low := uint32(pos)
+       bm, ok := b.bitmaps[key]
+       if !ok {
+               return false
+       }
+
+       return bm.Contains(low)
+}
+
+// IsEmpty returns true if no positions are set.
+func (b *RoaringPositionBitmap) IsEmpty() bool {
+       return b.Cardinality() == 0
+}
+
+// Cardinality returns the total number of set positions.
+func (b *RoaringPositionBitmap) Cardinality() int64 {
+       var c int64
+       for _, bm := range b.bitmaps {
+               c += int64(bm.GetCardinality())
+       }
+
+       return c
+}
+
+// Serialize writes in the Iceberg portable format (little-endian):
+//   - bitmap count (8 bytes, LE): number of non-empty bitmaps
+//   - for each bitmap in ascending key order: key (4 bytes, LE) + roaring 
portable data
+//
+// Only non-empty bitmaps are written, matching Java Iceberg behavior.
+func (b *RoaringPositionBitmap) Serialize(w io.Writer) error {
+       keys := make([]uint32, 0, len(b.bitmaps))
+       for k, bm := range b.bitmaps {
+               if bm.GetCardinality() > 0 {
+                       keys = append(keys, k)
+               }
+       }
+       sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
+
+       if err := binary.Write(w, binary.LittleEndian, int64(len(keys))); err 
!= nil {
+               return fmt.Errorf("write bitmap count: %w", err)
+       }
+       for _, key := range keys {
+               if err := binary.Write(w, binary.LittleEndian, key); err != nil 
{
+                       return fmt.Errorf("write key %d: %w", key, err)
+               }
+               if _, err := b.bitmaps[key].WriteTo(w); err != nil {
+                       return fmt.Errorf("write bitmap %d: %w", key, err)
+               }
+       }
+
+       return nil
+}
+
+// DeserializeRoaringPositionBitmap reads a bitmap from the Iceberg portable 
format.
+// Format: [count] { [key][bitmap] } .....{[key_n][bitmap_n]}
+func DeserializeRoaringPositionBitmap(r io.Reader) (*RoaringPositionBitmap, 
error) {
+       var count int64
+       if err := binary.Read(r, binary.LittleEndian, &count); err != nil {
+               return nil, fmt.Errorf("read bitmap count: %w", err)
+       }
+       if count < 0 {
+               return nil, fmt.Errorf("invalid bitmap count: %d", count)
+       }
+       if count > maxBitmapCount {
+               return nil, fmt.Errorf("bitmap count %d exceeds maximum allowed 
%d", count, maxBitmapCount)
+       }
+
+       b := &RoaringPositionBitmap{
+               bitmaps: make(map[uint32]*roaring.Bitmap, count),
+       }
+       var lastKey uint32
+       hasLastKey := false
+
+       for i := int64(0); i < count; i++ {

Review Comment:
   ```suggestion
        for i := range count {
   ```



##########
manifest.go:
##########
@@ -2123,10 +2126,10 @@ func NewDataFileBuilder(
                return nil, fmt.Errorf("%w: path cannot be empty", 
ErrInvalidArgument)
        }
 
-       if format != AvroFile && format != OrcFile && format != ParquetFile {
+       if format != AvroFile && format != OrcFile && format != ParquetFile && 
format != PuffinFile {

Review Comment:
   Agreed



##########
table/dv/roaring_bitmap.go:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+
+       "github.com/RoaringBitmap/roaring/v2"
+)
+
+// RoaringPositionBitmap supports 64-bit positions using an array of
+// 32-bit Roaring bitmaps. Positions are split into a 32-bit key
+// (high bits) and 32-bit value (low bits).
+//
+// Compatible with the Java Iceberg RoaringPositionBitmap serialization format.
+type RoaringPositionBitmap struct {
+       bitmaps []*roaring.Bitmap // index = high 32 bits (key)
+}
+
+// NewRoaringPositionBitmap creates an empty bitmap.
+func NewRoaringPositionBitmap() *RoaringPositionBitmap {
+       return &RoaringPositionBitmap{}
+}
+
+// Set marks a position in the bitmap.
+func (b *RoaringPositionBitmap) Set(pos int64) {
+       key := int(pos >> 32)
+       low := uint32(pos)
+       b.grow(key + 1)
+       b.bitmaps[key].Add(low)
+}
+
+// Contains checks if a position is set.
+func (b *RoaringPositionBitmap) Contains(pos int64) bool {
+       key := int(pos >> 32)
+       low := uint32(pos)
+       if key >= len(b.bitmaps) {
+               return false
+       }
+
+       return b.bitmaps[key].Contains(low)
+}
+
+// IsEmpty returns true if no positions are set.
+func (b *RoaringPositionBitmap) IsEmpty() bool {
+       return b.Cardinality() == 0
+}
+
+// Cardinality returns the total number of set positions.
+func (b *RoaringPositionBitmap) Cardinality() int64 {
+       var c int64
+       for _, bm := range b.bitmaps {
+               c += int64(bm.GetCardinality())
+       }
+
+       return c
+}
+
+// Serialize writes in the Iceberg portable format (little-endian):
+//   - bitmap count (8 bytes, LE)
+//   - for each bitmap: key (4 bytes, LE) + roaring portable data
+func (b *RoaringPositionBitmap) Serialize(w io.Writer) error {

Review Comment:
   Agreed



##########
table/dv/deletion_vector.go:
##########
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package dv
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "hash/crc32"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/puffin"
+)
+
+const (
+       // DVMagicNumber is the magic number for deletion vectors.
+       // Spec bytes: D1 D3 39 64 (big-endian) = 0x6439D3D1 (little-endian 
uint32)
+       DVMagicNumber uint32 = 0x6439D3D1
+
+       dvLengthSize = 4 // length field
+       dvMagicSize  = 4 // magic field
+       dvCRCSize    = 4 // CRC-32 checksum
+       dvMinSize    = dvLengthSize + dvMagicSize + dvCRCSize
+)
+
+// DeserializeDV parses a deletion vector blob and returns a bitmap of deleted 
positions.
+//
+// The DV binary format is:
+//   - Length (4 bytes, big-endian): size of magic + bitmap data

Review Comment:
   does the length include the CRC32 bytes also? 



##########
table/dv/deletion_vector.go:
##########
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package dv
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "hash/crc32"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/puffin"
+)
+
+const (
+       // DVMagicNumber is the magic number for deletion vectors.
+       // Spec bytes: D1 D3 39 64 (big-endian) = 0x6439D3D1 (little-endian 
uint32)
+       DVMagicNumber uint32 = 0x6439D3D1
+
+       dvLengthSize = 4 // length field
+       dvMagicSize  = 4 // magic field
+       dvCRCSize    = 4 // CRC-32 checksum
+       dvMinSize    = dvLengthSize + dvMagicSize + dvCRCSize
+)
+
+// DeserializeDV parses a deletion vector blob and returns a bitmap of deleted 
positions.
+//
+// The DV binary format is:
+//   - Length (4 bytes, big-endian): size of magic + bitmap data
+//   - Magic  (4 bytes, little-endian): must be 0x6439D3D1
+//   - Bitmap (variable): roaring bitmap in Iceberg portable format
+//   - CRC-32 (4 bytes, big-endian): checksum over magic + bitmap

Review Comment:
   it's super weird that the spec jumps back and forth between big and little 
endian representations....



##########
table/dv/roaring_bitmap.go:
##########
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package dv
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+       "slices"
+       "sort"
+
+       "github.com/RoaringBitmap/roaring/v2"
+       "github.com/apache/iceberg-go/puffin"
+)
+
+// maxBitmapCount is the maximum number of 32-bit bitmap keys allowed during
+// deserialization. This prevents CPU/memory exhaustion from absurd counts
+// in malformed input. Derived from puffin.DefaultMaxBlobSize / 8 (minimum
+// per-bitmap overhead: 4-byte key + at least 4 bytes of roaring data).
+var maxBitmapCount = int64(puffin.DefaultMaxBlobSize / 8)
+
+// RoaringPositionBitmap supports 64-bit positions using a sparse map of
+// 32-bit Roaring bitmaps. Positions are split into a 32-bit key
+// (high bits) and 32-bit value (low bits).
+//
+// Compatible with the Java Iceberg RoaringPositionBitmap serialization format.
+type RoaringPositionBitmap struct {
+       bitmaps map[uint32]*roaring.Bitmap
+}
+
+// NewRoaringPositionBitmap creates an empty bitmap.
+func NewRoaringPositionBitmap() *RoaringPositionBitmap {
+       return &RoaringPositionBitmap{
+               bitmaps: make(map[uint32]*roaring.Bitmap),
+       }
+}
+
+// Set marks a position in the bitmap.
+// Position must be non-negative.
+func (b *RoaringPositionBitmap) Set(pos int64) {
+       if pos < 0 {
+               return
+       }
+       key := uint32(pos >> 32)
+       low := uint32(pos)
+       bm, ok := b.bitmaps[key]
+       if !ok {
+               bm = roaring.New()
+               b.bitmaps[key] = bm
+       }
+       bm.Add(low)
+}
+
+// Contains checks if a position is set.
+func (b *RoaringPositionBitmap) Contains(pos int64) bool {
+       if pos < 0 {
+               return false
+       }
+       key := uint32(pos >> 32)
+       low := uint32(pos)
+       bm, ok := b.bitmaps[key]
+       if !ok {
+               return false
+       }
+
+       return bm.Contains(low)
+}
+
+// IsEmpty returns true if no positions are set.
+func (b *RoaringPositionBitmap) IsEmpty() bool {
+       return b.Cardinality() == 0
+}
+
+// Cardinality returns the total number of set positions.
+func (b *RoaringPositionBitmap) Cardinality() int64 {
+       var c int64
+       for _, bm := range b.bitmaps {
+               c += int64(bm.GetCardinality())
+       }
+
+       return c
+}
+
+// Serialize writes in the Iceberg portable format (little-endian):
+//   - bitmap count (8 bytes, LE): number of non-empty bitmaps
+//   - for each bitmap in ascending key order: key (4 bytes, LE) + roaring 
portable data
+//
+// Only non-empty bitmaps are written, matching Java Iceberg behavior.
+func (b *RoaringPositionBitmap) Serialize(w io.Writer) error {
+       keys := make([]uint32, 0, len(b.bitmaps))
+       for k, bm := range b.bitmaps {
+               if bm.GetCardinality() > 0 {
+                       keys = append(keys, k)
+               }
+       }
+       sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
+
+       if err := binary.Write(w, binary.LittleEndian, int64(len(keys))); err 
!= nil {
+               return fmt.Errorf("write bitmap count: %w", err)
+       }
+       for _, key := range keys {
+               if err := binary.Write(w, binary.LittleEndian, key); err != nil 
{
+                       return fmt.Errorf("write key %d: %w", key, err)
+               }
+               if _, err := b.bitmaps[key].WriteTo(w); err != nil {
+                       return fmt.Errorf("write bitmap %d: %w", key, err)
+               }
+       }
+
+       return nil
+}
+
+// DeserializeRoaringPositionBitmap reads a bitmap from the Iceberg portable 
format.
+// Format: [count] { [key][bitmap] } .....{[key_n][bitmap_n]}
+func DeserializeRoaringPositionBitmap(r io.Reader) (*RoaringPositionBitmap, 
error) {

Review Comment:
   If the only current use for this always going to have the whole byte slice 
and then have to wrap it with `bytes.NewReader` why not just take a byte slice 
here instead and avoid the trip through `io.Reader`?



##########
table/dv/roaring_bitmap.go:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+
+       "github.com/RoaringBitmap/roaring/v2"
+)
+
+// RoaringPositionBitmap supports 64-bit positions using an array of
+// 32-bit Roaring bitmaps. Positions are split into a 32-bit key
+// (high bits) and 32-bit value (low bits).
+//
+// Compatible with the Java Iceberg RoaringPositionBitmap serialization format.
+type RoaringPositionBitmap struct {
+       bitmaps []*roaring.Bitmap // index = high 32 bits (key)
+}
+
+// NewRoaringPositionBitmap creates an empty bitmap.
+func NewRoaringPositionBitmap() *RoaringPositionBitmap {
+       return &RoaringPositionBitmap{}
+}
+
+// Set marks a position in the bitmap.
+func (b *RoaringPositionBitmap) Set(pos int64) {
+       key := int(pos >> 32)
+       low := uint32(pos)
+       b.grow(key + 1)
+       b.bitmaps[key].Add(low)
+}
+
+// Contains checks if a position is set.
+func (b *RoaringPositionBitmap) Contains(pos int64) bool {

Review Comment:
   Same question, can we just use `uint64` to alleviate the need for the check?



##########
table/dv/roaring_bitmap.go:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+
+       "github.com/RoaringBitmap/roaring/v2"
+)
+
+// RoaringPositionBitmap supports 64-bit positions using an array of
+// 32-bit Roaring bitmaps. Positions are split into a 32-bit key
+// (high bits) and 32-bit value (low bits).
+//
+// Compatible with the Java Iceberg RoaringPositionBitmap serialization format.
+type RoaringPositionBitmap struct {
+       bitmaps []*roaring.Bitmap // index = high 32 bits (key)
+}
+
+// NewRoaringPositionBitmap creates an empty bitmap.
+func NewRoaringPositionBitmap() *RoaringPositionBitmap {
+       return &RoaringPositionBitmap{}
+}
+
+// Set marks a position in the bitmap.
+func (b *RoaringPositionBitmap) Set(pos int64) {

Review Comment:
   any reason we can't just use `uint64` to alleviate the need for the check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to