laskoviymishka commented on code in PR #866:
URL: https://github.com/apache/iceberg-go/pull/866#discussion_r3059355538


##########
table/roaring_bitmap.go:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+
+       "github.com/RoaringBitmap/roaring/v2"
+)
+
+// RoaringPositionBitmap supports 64-bit positions using an array of
+// 32-bit Roaring bitmaps. Positions are split into a 32-bit key
+// (high bits) and 32-bit value (low bits).
+//
+// Compatible with the Java Iceberg RoaringPositionBitmap serialization format.
+type RoaringPositionBitmap struct {
+       bitmaps []*roaring.Bitmap // index = high 32 bits (key)
+}
+
+// NewRoaringPositionBitmap creates an empty bitmap.
+func NewRoaringPositionBitmap() *RoaringPositionBitmap {
+       return &RoaringPositionBitmap{}
+}
+
+// Set marks a position in the bitmap.
+func (b *RoaringPositionBitmap) Set(pos int64) {
+       key := int(pos >> 32)
+       low := uint32(pos)
+       b.grow(key + 1)
+       b.bitmaps[key].Add(low)
+}
+
+// Contains checks if a position is set.
+func (b *RoaringPositionBitmap) Contains(pos int64) bool {
+       key := int(pos >> 32)
+       low := uint32(pos)
+       if key >= len(b.bitmaps) {
+               return false
+       }
+
+       return b.bitmaps[key].Contains(low)
+}
+
+// IsEmpty returns true if no positions are set.
+func (b *RoaringPositionBitmap) IsEmpty() bool {
+       return b.Cardinality() == 0
+}
+
+// Cardinality returns the total number of set positions.
+func (b *RoaringPositionBitmap) Cardinality() int64 {
+       var c int64
+       for _, bm := range b.bitmaps {
+               c += int64(bm.GetCardinality())
+       }
+
+       return c
+}
+
+// Serialize writes in the Iceberg portable format (little-endian):
+//   - bitmap count (8 bytes, LE)
+//   - for each bitmap: key (4 bytes, LE) + roaring portable data
+func (b *RoaringPositionBitmap) Serialize(w io.Writer) error {
+       if err := binary.Write(w, binary.LittleEndian, int64(len(b.bitmaps))); 
err != nil {
+               return fmt.Errorf("write bitmap count: %w", err)
+       }
+       for key, bm := range b.bitmaps {
+               if err := binary.Write(w, binary.LittleEndian, uint32(key)); 
err != nil {
+                       return fmt.Errorf("write key %d: %w", key, err)
+               }
+               if _, err := bm.WriteTo(w); err != nil {
+                       return fmt.Errorf("write bitmap %d: %w", key, err)
+               }
+       }
+
+       return nil
+}
+
+// DeserializeRoaringPositionBitmap reads a bitmap from the Iceberg portable 
format.
+func DeserializeRoaringPositionBitmap(r io.Reader) (*RoaringPositionBitmap, 
error) {
+       var count int64
+       if err := binary.Read(r, binary.LittleEndian, &count); err != nil {
+               return nil, fmt.Errorf("read bitmap count: %w", err)
+       }
+       if count < 0 {
+               return nil, fmt.Errorf("invalid bitmap count: %d", count)
+       }
+
+       b := &RoaringPositionBitmap{}
+       lastKey := -1
+
+       for i := int64(0); i < count; i++ {

Review Comment:
   count is validated for < 0 but not capped:
   
   count = math.MaxInt64 will spin the loop trying to allocate. 
   
   i think we need here a reasonable max (something like puffin 
DefaultMaxBlobSize / minimum-bitmap-size)



##########
manifest.go:
##########
@@ -2123,10 +2126,10 @@ func NewDataFileBuilder(
                return nil, fmt.Errorf("%w: path cannot be empty", 
ErrInvalidArgument)
        }
 
-       if format != AvroFile && format != OrcFile && format != ParquetFile {
+       if format != AvroFile && format != OrcFile && format != ParquetFile && 
format != PuffinFile {

Review Comment:
   PuffinFile should probably only be valid when content == 
EntryContentPosDeletes — right now a caller could create a data file 
(content=DATA) with PUFFIN format and this check wouldn't catch it



##########
table/deletion_vector.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table

Review Comment:
   i'm sart to think that moving this to separate package may be a good idea, 
in general table package is quite large.
   
   wdyt?



##########
table/roaring_bitmap_test.go:
##########
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "bytes"
+       "encoding/binary"
+       "os"
+       "path/filepath"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func readBitmapTestData(t *testing.T, name string) []byte {

Review Comment:
   is this a duplicate function? 
   can we re-use readDVTestData from other test here?



##########
table/deletion_vector_test.go:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "encoding/binary"
+       "errors"
+       "hash/crc32"
+       "os"
+       "path/filepath"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/puffin"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func readDVTestData(t *testing.T, name string) []byte {

Review Comment:
   am i reading this right:
   - 64map32bitvals.bin, 
   - all-container-types-position-index.bin
   - empty-position-index.bin
   - small-and-large-values-position-index.bin
   
   never actually reads here, why then add them into testdata?
   i think it make sense to include coverage for them



##########
table/roaring_bitmap.go:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+
+       "github.com/RoaringBitmap/roaring/v2"
+)
+
+// RoaringPositionBitmap supports 64-bit positions using an array of
+// 32-bit Roaring bitmaps. Positions are split into a 32-bit key
+// (high bits) and 32-bit value (low bits).
+//
+// Compatible with the Java Iceberg RoaringPositionBitmap serialization format.
+type RoaringPositionBitmap struct {
+       bitmaps []*roaring.Bitmap // index = high 32 bits (key)
+}
+
+// NewRoaringPositionBitmap creates an empty bitmap.
+func NewRoaringPositionBitmap() *RoaringPositionBitmap {
+       return &RoaringPositionBitmap{}
+}
+
+// Set marks a position in the bitmap.
+func (b *RoaringPositionBitmap) Set(pos int64) {
+       key := int(pos >> 32)
+       low := uint32(pos)
+       b.grow(key + 1)
+       b.bitmaps[key].Add(low)
+}
+
+// Contains checks if a position is set.
+func (b *RoaringPositionBitmap) Contains(pos int64) bool {
+       key := int(pos >> 32)
+       low := uint32(pos)
+       if key >= len(b.bitmaps) {
+               return false
+       }
+
+       return b.bitmaps[key].Contains(low)
+}
+
+// IsEmpty returns true if no positions are set.
+func (b *RoaringPositionBitmap) IsEmpty() bool {
+       return b.Cardinality() == 0
+}
+
+// Cardinality returns the total number of set positions.
+func (b *RoaringPositionBitmap) Cardinality() int64 {
+       var c int64
+       for _, bm := range b.bitmaps {
+               c += int64(bm.GetCardinality())
+       }
+
+       return c
+}
+
+// Serialize writes in the Iceberg portable format (little-endian):
+//   - bitmap count (8 bytes, LE)
+//   - for each bitmap: key (4 bytes, LE) + roaring portable data
+func (b *RoaringPositionBitmap) Serialize(w io.Writer) error {

Review Comment:
   Serialize writes len(b.bitmaps) as count including empty gap fillers — Java 
only writes non-empty bitmaps. Set((5<<32)|1) produces count=6 here but count=1 
in Java. 
   
   this will break cross-client interop when the write path ships. either skip 
empties or unexport until then



##########
table/roaring_bitmap.go:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+
+       "github.com/RoaringBitmap/roaring/v2"
+)
+
+// RoaringPositionBitmap supports 64-bit positions using an array of
+// 32-bit Roaring bitmaps. Positions are split into a 32-bit key
+// (high bits) and 32-bit value (low bits).
+//
+// Compatible with the Java Iceberg RoaringPositionBitmap serialization format.
+type RoaringPositionBitmap struct {
+       bitmaps []*roaring.Bitmap // index = high 32 bits (key)
+}
+
+// NewRoaringPositionBitmap creates an empty bitmap.
+func NewRoaringPositionBitmap() *RoaringPositionBitmap {
+       return &RoaringPositionBitmap{}
+}
+
+// Set marks a position in the bitmap.
+func (b *RoaringPositionBitmap) Set(pos int64) {

Review Comment:
   need a `if pos < 0 {` guard here
   
   negative pos → negative key via arithmetic shift → grow is a no-op → 
bitmaps[key] panics with index-out-of-range.



##########
table/deletion_vector.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "hash/crc32"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/puffin"
+)
+
+const (
+       // DVMagicNumber is the magic number for deletion vectors.
+       // Spec bytes: D1 D3 39 64 (big-endian) = 0x6439D3D1 (little-endian 
uint32)
+       DVMagicNumber uint32 = 0x6439D3D1
+
+       dvLengthSize = 4 // length field
+       dvMagicSize  = 4 // magic field
+       dvCRCSize    = 4 // CRC-32 checksum
+       dvMinSize    = dvLengthSize + dvMagicSize + dvCRCSize
+)
+
+// DeserializeDV parses a deletion vector blob and returns a bitmap of deleted 
positions.
+//
+// The DV binary format is:
+//   - Length (4 bytes, big-endian): size of magic + bitmap data
+//   - Magic  (4 bytes, little-endian): must be 0x6439D3D1
+//   - Bitmap (variable): roaring bitmap in Iceberg portable format
+//   - CRC-32 (4 bytes, big-endian): checksum over magic + bitmap
+//
+// If expectedCardinality >= 0, the bitmap's cardinality is validated against 
it.
+func DeserializeDV(data []byte, expectedCardinality int64) 
(*RoaringPositionBitmap, error) {
+       if len(data) < dvMinSize {
+               return nil, fmt.Errorf("deletion vector payload too short: %d 
bytes (minimum %d)", len(data), dvMinSize)
+       }
+
+       // 1. Read and validate length
+       length := binary.BigEndian.Uint32(data[0:dvLengthSize])
+       expectedLength := uint32(len(data) - dvLengthSize - dvCRCSize)
+       if length != expectedLength {
+               return nil, fmt.Errorf("deletion vector length mismatch: got 
%d, expected %d", length, expectedLength)
+       }
+
+       // 2. Read and validate magic
+       magic := binary.LittleEndian.Uint32(data[dvLengthSize : 
dvLengthSize+dvMagicSize])
+       if magic != DVMagicNumber {
+               return nil, fmt.Errorf("invalid deletion vector magic: 0x%08x, 
expected 0x%08x", magic, DVMagicNumber)
+       }
+
+       // 3. Verify CRC-32 over magic + bitmap (bytes 4 to len-4)
+       bitmapDataStart := dvLengthSize
+       bitmapDataEnd := len(data) - dvCRCSize
+       computedCRC := crc32.ChecksumIEEE(data[bitmapDataStart:bitmapDataEnd])
+       expectedCRC := binary.BigEndian.Uint32(data[bitmapDataEnd:])
+       if computedCRC != expectedCRC {
+               return nil, fmt.Errorf("deletion vector CRC mismatch: computed 
0x%08x, expected 0x%08x", computedCRC, expectedCRC)
+       }
+
+       // 4. Deserialize roaring bitmap from the inner bytes (after length + 
magic, before CRC)
+       roaringStart := dvLengthSize + dvMagicSize
+       bitmap, err := 
DeserializeRoaringPositionBitmap(bytes.NewReader(data[roaringStart:bitmapDataEnd]))
+       if err != nil {
+               return nil, fmt.Errorf("deserialize deletion vector bitmap: 
%w", err)
+       }
+
+       // 5. Validate cardinality if requested
+       if expectedCardinality >= 0 {
+               actual := bitmap.Cardinality()
+               if actual != expectedCardinality {
+                       return nil, fmt.Errorf("deletion vector cardinality 
mismatch: got %d, expected %d", actual, expectedCardinality)
+               }
+       }
+
+       return bitmap, nil
+}
+
+// ReadDV reads a deletion vector from a puffin file using the manifest entry 
metadata.
+// ContentOffset and ContentSizeInBytes must be set on the DataFile (required 
by v3 spec).
+func ReadDV(fs iceio.IO, dvFile iceberg.DataFile) (*RoaringPositionBitmap, 
error) {
+       if dvFile.FileFormat() != iceberg.PuffinFile {
+               return nil, fmt.Errorf("expected PUFFIN format for deletion 
vector, got %s", dvFile.FileFormat())
+       }
+
+       if dvFile.ContentOffset() == nil || dvFile.ContentSizeInBytes() == nil {
+               return nil, fmt.Errorf("DV file %s missing 
ContentOffset/ContentSizeInBytes", dvFile.FilePath())
+       }
+
+       f, err := fs.Open(dvFile.FilePath())
+       if err != nil {
+               return nil, fmt.Errorf("open DV file %s: %w", 
dvFile.FilePath(), err)
+       }
+       defer f.Close()
+
+       reader, err := puffin.NewReader(f)
+       if err != nil {
+               return nil, fmt.Errorf("create puffin reader for %s: %w", 
dvFile.FilePath(), err)
+       }
+
+       offset := *dvFile.ContentOffset()
+       size := *dvFile.ContentSizeInBytes()
+       blobData := make([]byte, size)
+       if _, err := reader.ReadAt(blobData, offset); err != nil {
+               return nil, fmt.Errorf("read DV blob at offset %d: %w", offset, 
err)
+       }
+
+       return DeserializeDV(blobData, dvFile.Count())

Review Comment:
   we always passes dvFile.Count() as expectedCardinality. Count() returns 0 
when not set in manifest. 
   
   DeserializeDV treats >= 0 as "validate" → will reject any real DV when count 
defaults to 0. 
   
   Need either pass -1 always and let caller validate, or use *int64 for the 
parameter



##########
table/deletion_vector.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+       "hash/crc32"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/puffin"
+)
+
+const (
+       // DVMagicNumber is the magic number for deletion vectors.
+       // Spec bytes: D1 D3 39 64 (big-endian) = 0x6439D3D1 (little-endian 
uint32)
+       DVMagicNumber uint32 = 0x6439D3D1
+
+       dvLengthSize = 4 // length field
+       dvMagicSize  = 4 // magic field
+       dvCRCSize    = 4 // CRC-32 checksum
+       dvMinSize    = dvLengthSize + dvMagicSize + dvCRCSize
+)
+
+// DeserializeDV parses a deletion vector blob and returns a bitmap of deleted 
positions.
+//
+// The DV binary format is:
+//   - Length (4 bytes, big-endian): size of magic + bitmap data
+//   - Magic  (4 bytes, little-endian): must be 0x6439D3D1
+//   - Bitmap (variable): roaring bitmap in Iceberg portable format
+//   - CRC-32 (4 bytes, big-endian): checksum over magic + bitmap
+//
+// If expectedCardinality >= 0, the bitmap's cardinality is validated against 
it.
+func DeserializeDV(data []byte, expectedCardinality int64) 
(*RoaringPositionBitmap, error) {
+       if len(data) < dvMinSize {
+               return nil, fmt.Errorf("deletion vector payload too short: %d 
bytes (minimum %d)", len(data), dvMinSize)
+       }
+
+       // 1. Read and validate length
+       length := binary.BigEndian.Uint32(data[0:dvLengthSize])
+       expectedLength := uint32(len(data) - dvLengthSize - dvCRCSize)
+       if length != expectedLength {
+               return nil, fmt.Errorf("deletion vector length mismatch: got 
%d, expected %d", length, expectedLength)
+       }
+
+       // 2. Read and validate magic
+       magic := binary.LittleEndian.Uint32(data[dvLengthSize : 
dvLengthSize+dvMagicSize])
+       if magic != DVMagicNumber {
+               return nil, fmt.Errorf("invalid deletion vector magic: 0x%08x, 
expected 0x%08x", magic, DVMagicNumber)
+       }
+
+       // 3. Verify CRC-32 over magic + bitmap (bytes 4 to len-4)
+       bitmapDataStart := dvLengthSize
+       bitmapDataEnd := len(data) - dvCRCSize
+       computedCRC := crc32.ChecksumIEEE(data[bitmapDataStart:bitmapDataEnd])
+       expectedCRC := binary.BigEndian.Uint32(data[bitmapDataEnd:])
+       if computedCRC != expectedCRC {
+               return nil, fmt.Errorf("deletion vector CRC mismatch: computed 
0x%08x, expected 0x%08x", computedCRC, expectedCRC)
+       }
+
+       // 4. Deserialize roaring bitmap from the inner bytes (after length + 
magic, before CRC)
+       roaringStart := dvLengthSize + dvMagicSize
+       bitmap, err := 
DeserializeRoaringPositionBitmap(bytes.NewReader(data[roaringStart:bitmapDataEnd]))
+       if err != nil {
+               return nil, fmt.Errorf("deserialize deletion vector bitmap: 
%w", err)
+       }
+
+       // 5. Validate cardinality if requested
+       if expectedCardinality >= 0 {
+               actual := bitmap.Cardinality()
+               if actual != expectedCardinality {
+                       return nil, fmt.Errorf("deletion vector cardinality 
mismatch: got %d, expected %d", actual, expectedCardinality)
+               }
+       }
+
+       return bitmap, nil
+}
+
+// ReadDV reads a deletion vector from a puffin file using the manifest entry 
metadata.
+// ContentOffset and ContentSizeInBytes must be set on the DataFile (required 
by v3 spec).
+func ReadDV(fs iceio.IO, dvFile iceberg.DataFile) (*RoaringPositionBitmap, 
error) {
+       if dvFile.FileFormat() != iceberg.PuffinFile {
+               return nil, fmt.Errorf("expected PUFFIN format for deletion 
vector, got %s", dvFile.FileFormat())
+       }
+
+       if dvFile.ContentOffset() == nil || dvFile.ContentSizeInBytes() == nil {
+               return nil, fmt.Errorf("DV file %s missing 
ContentOffset/ContentSizeInBytes", dvFile.FilePath())
+       }
+
+       f, err := fs.Open(dvFile.FilePath())
+       if err != nil {
+               return nil, fmt.Errorf("open DV file %s: %w", 
dvFile.FilePath(), err)
+       }
+       defer f.Close()
+
+       reader, err := puffin.NewReader(f)
+       if err != nil {
+               return nil, fmt.Errorf("create puffin reader for %s: %w", 
dvFile.FilePath(), err)
+       }
+
+       offset := *dvFile.ContentOffset()
+       size := *dvFile.ContentSizeInBytes()
+       blobData := make([]byte, size)

Review Comment:
   need a guard: `if size < 0 || size >
     DefaultMaxBlobSize { return error }`
   
   this would panic on negative or huge ContentSizeInBytes



##########
table/roaring_bitmap.go:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+
+       "github.com/RoaringBitmap/roaring/v2"
+)
+
+// RoaringPositionBitmap supports 64-bit positions using an array of
+// 32-bit Roaring bitmaps. Positions are split into a 32-bit key
+// (high bits) and 32-bit value (low bits).
+//
+// Compatible with the Java Iceberg RoaringPositionBitmap serialization format.
+type RoaringPositionBitmap struct {
+       bitmaps []*roaring.Bitmap // index = high 32 bits (key)
+}
+
+// NewRoaringPositionBitmap creates an empty bitmap.
+func NewRoaringPositionBitmap() *RoaringPositionBitmap {
+       return &RoaringPositionBitmap{}
+}
+
+// Set marks a position in the bitmap.
+func (b *RoaringPositionBitmap) Set(pos int64) {
+       key := int(pos >> 32)
+       low := uint32(pos)
+       b.grow(key + 1)
+       b.bitmaps[key].Add(low)
+}
+
+// Contains checks if a position is set.
+func (b *RoaringPositionBitmap) Contains(pos int64) bool {
+       key := int(pos >> 32)
+       low := uint32(pos)
+       if key >= len(b.bitmaps) {
+               return false
+       }
+
+       return b.bitmaps[key].Contains(low)
+}
+
+// IsEmpty returns true if no positions are set.
+func (b *RoaringPositionBitmap) IsEmpty() bool {
+       return b.Cardinality() == 0
+}
+
+// Cardinality returns the total number of set positions.
+func (b *RoaringPositionBitmap) Cardinality() int64 {
+       var c int64
+       for _, bm := range b.bitmaps {
+               c += int64(bm.GetCardinality())
+       }
+
+       return c
+}
+
+// Serialize writes in the Iceberg portable format (little-endian):
+//   - bitmap count (8 bytes, LE)
+//   - for each bitmap: key (4 bytes, LE) + roaring portable data
+func (b *RoaringPositionBitmap) Serialize(w io.Writer) error {
+       if err := binary.Write(w, binary.LittleEndian, int64(len(b.bitmaps))); 
err != nil {
+               return fmt.Errorf("write bitmap count: %w", err)
+       }
+       for key, bm := range b.bitmaps {
+               if err := binary.Write(w, binary.LittleEndian, uint32(key)); 
err != nil {
+                       return fmt.Errorf("write key %d: %w", key, err)
+               }
+               if _, err := bm.WriteTo(w); err != nil {
+                       return fmt.Errorf("write bitmap %d: %w", key, err)
+               }
+       }
+
+       return nil
+}
+
+// DeserializeRoaringPositionBitmap reads a bitmap from the Iceberg portable 
format.
+func DeserializeRoaringPositionBitmap(r io.Reader) (*RoaringPositionBitmap, 
error) {
+       var count int64
+       if err := binary.Read(r, binary.LittleEndian, &count); err != nil {
+               return nil, fmt.Errorf("read bitmap count: %w", err)
+       }
+       if count < 0 {
+               return nil, fmt.Errorf("invalid bitmap count: %d", count)
+       }
+
+       b := &RoaringPositionBitmap{}
+       lastKey := -1
+
+       for i := int64(0); i < count; i++ {
+               var key uint32
+               if err := binary.Read(r, binary.LittleEndian, &key); err != nil 
{
+                       return nil, fmt.Errorf("read key %d: %w", i, err)
+               }
+               if int(key) <= lastKey {
+                       return nil, fmt.Errorf("keys must be ascending: got %d 
after %d", key, lastKey)
+               }
+
+               // fill gaps with empty bitmaps
+               for len(b.bitmaps) < int(key) {

Review Comment:
   can OOM with cracked input: keys [0, 0xFFFFFFFF] with count=2 allocates ~4 
billion empty bitmaps. 
   
   maybe use a `map[uint32]*roaring.Bitmap` or cap the max key



##########
table/roaring_bitmap.go:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+
+       "github.com/RoaringBitmap/roaring/v2"
+)
+
+// RoaringPositionBitmap supports 64-bit positions using an array of
+// 32-bit Roaring bitmaps. Positions are split into a 32-bit key
+// (high bits) and 32-bit value (low bits).
+//
+// Compatible with the Java Iceberg RoaringPositionBitmap serialization format.
+type RoaringPositionBitmap struct {
+       bitmaps []*roaring.Bitmap // index = high 32 bits (key)
+}
+
+// NewRoaringPositionBitmap creates an empty bitmap.
+func NewRoaringPositionBitmap() *RoaringPositionBitmap {
+       return &RoaringPositionBitmap{}
+}
+
+// Set marks a position in the bitmap.
+func (b *RoaringPositionBitmap) Set(pos int64) {
+       key := int(pos >> 32)
+       low := uint32(pos)
+       b.grow(key + 1)
+       b.bitmaps[key].Add(low)
+}
+
+// Contains checks if a position is set.
+func (b *RoaringPositionBitmap) Contains(pos int64) bool {

Review Comment:
   same, need >0 guard



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to