zeroshade commented on code in PR #866: URL: https://github.com/apache/iceberg-go/pull/866#discussion_r3082217171
########## table/dv/deletion_vector.go: ########## @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dv + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash/crc32" + + "github.com/apache/iceberg-go" + iceio "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/puffin" +) + +const ( + // DVMagicNumber is the magic number for deletion vectors. + // Spec bytes: D1 D3 39 64 (big-endian) = 0x6439D3D1 (little-endian uint32) + DVMagicNumber uint32 = 0x6439D3D1 Review Comment: Will defining the constant this way work for big-endian architectures or do we need to add a conditional compilation etc? Can we just use the actual uint32 value to avoid endian differences? ########## table/dv/deletion_vector.go: ########## @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dv + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash/crc32" + + "github.com/apache/iceberg-go" + iceio "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/puffin" +) + +const ( + // DVMagicNumber is the magic number for deletion vectors. + // Spec bytes: D1 D3 39 64 (big-endian) = 0x6439D3D1 (little-endian uint32) + DVMagicNumber uint32 = 0x6439D3D1 + + dvLengthSize = 4 // length field + dvMagicSize = 4 // magic field + dvCRCSize = 4 // CRC-32 checksum + dvMinSize = dvLengthSize + dvMagicSize + dvCRCSize +) + +// DeserializeDV parses a deletion vector blob and returns a bitmap of deleted positions. +// +// The DV binary format is: +// - Length (4 bytes, big-endian): size of magic + bitmap data +// - Magic (4 bytes, little-endian): must be 0x6439D3D1 +// - Bitmap (variable): roaring bitmap in Iceberg portable format +// - CRC-32 (4 bytes, big-endian): checksum over magic + bitmap +// +// If expectedCardinality >= 0, the bitmap's cardinality is validated against it. +func DeserializeDV(data []byte, expectedCardinality int64) (*RoaringPositionBitmap, error) { + if len(data) < dvMinSize { + return nil, fmt.Errorf("deletion vector payload too short: %d bytes (minimum %d)", len(data), dvMinSize) + } + + // 1. Read and validate length + length := binary.BigEndian.Uint32(data[0:dvLengthSize]) + expectedLength := uint32(len(data) - dvLengthSize - dvCRCSize) + if length != expectedLength { + return nil, fmt.Errorf("deletion vector length mismatch: got %d, expected %d", length, expectedLength) + } + + // 2. Read and validate magic + magic := binary.LittleEndian.Uint32(data[dvLengthSize : dvLengthSize+dvMagicSize]) + if magic != DVMagicNumber { + return nil, fmt.Errorf("invalid deletion vector magic: 0x%08x, expected 0x%08x", magic, DVMagicNumber) + } + + // 3. Verify CRC-32 over magic + bitmap (bytes 4 to len-4) + bitmapDataStart := dvLengthSize + bitmapDataEnd := len(data) - dvCRCSize + computedCRC := crc32.ChecksumIEEE(data[bitmapDataStart:bitmapDataEnd]) + expectedCRC := binary.BigEndian.Uint32(data[bitmapDataEnd:]) + if computedCRC != expectedCRC { + return nil, fmt.Errorf("deletion vector CRC mismatch: computed 0x%08x, expected 0x%08x", computedCRC, expectedCRC) + } + + // 4. Deserialize roaring bitmap from the inner bytes (after length + magic, before CRC) + roaringStart := dvLengthSize + dvMagicSize + bitmap, err := DeserializeRoaringPositionBitmap(bytes.NewReader(data[roaringStart:bitmapDataEnd])) + if err != nil { + return nil, fmt.Errorf("deserialize deletion vector bitmap: %w", err) + } + + // 5. Validate cardinality if requested + if expectedCardinality >= 0 { + actual := bitmap.Cardinality() + if actual != expectedCardinality { + return nil, fmt.Errorf("deletion vector cardinality mismatch: got %d, expected %d", actual, expectedCardinality) + } + } + + return bitmap, nil +} + +// ReadDV reads a deletion vector from a puffin file using the manifest entry metadata. +// ContentOffset and ContentSizeInBytes must be set on the DataFile (required by v3 spec). +func ReadDV(fs iceio.IO, dvFile iceberg.DataFile) (*RoaringPositionBitmap, error) { + if dvFile.FileFormat() != iceberg.PuffinFile { + return nil, fmt.Errorf("expected PUFFIN format for deletion vector, got %s", dvFile.FileFormat()) + } + + if dvFile.ContentOffset() == nil || dvFile.ContentSizeInBytes() == nil { + return nil, fmt.Errorf("DV file %s missing ContentOffset/ContentSizeInBytes", dvFile.FilePath()) + } + + size := *dvFile.ContentSizeInBytes() + if size < 0 || size > int64(puffin.DefaultMaxBlobSize) { + return nil, fmt.Errorf("DV blob size %d out of valid range [0, %d]", size, puffin.DefaultMaxBlobSize) + } + + f, err := fs.Open(dvFile.FilePath()) + if err != nil { + return nil, fmt.Errorf("open DV file %s: %w", dvFile.FilePath(), err) + } + defer f.Close() + + reader, err := puffin.NewReader(f) Review Comment: does the puffin reader need a `defer reader.Close()`? ########## table/dv/roaring_bitmap.go: ########## @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dv + +import ( + "encoding/binary" + "fmt" + "io" + "slices" + "sort" + + "github.com/RoaringBitmap/roaring/v2" + "github.com/apache/iceberg-go/puffin" +) + +// maxBitmapCount is the maximum number of 32-bit bitmap keys allowed during +// deserialization. This prevents CPU/memory exhaustion from absurd counts +// in malformed input. Derived from puffin.DefaultMaxBlobSize / 8 (minimum +// per-bitmap overhead: 4-byte key + at least 4 bytes of roaring data). +var maxBitmapCount = int64(puffin.DefaultMaxBlobSize / 8) + +// RoaringPositionBitmap supports 64-bit positions using a sparse map of +// 32-bit Roaring bitmaps. Positions are split into a 32-bit key +// (high bits) and 32-bit value (low bits). +// +// Compatible with the Java Iceberg RoaringPositionBitmap serialization format. +type RoaringPositionBitmap struct { + bitmaps map[uint32]*roaring.Bitmap +} + +// NewRoaringPositionBitmap creates an empty bitmap. +func NewRoaringPositionBitmap() *RoaringPositionBitmap { + return &RoaringPositionBitmap{ + bitmaps: make(map[uint32]*roaring.Bitmap), + } +} + +// Set marks a position in the bitmap. +// Position must be non-negative. +func (b *RoaringPositionBitmap) Set(pos int64) { + if pos < 0 { + return + } + key := uint32(pos >> 32) + low := uint32(pos) + bm, ok := b.bitmaps[key] + if !ok { + bm = roaring.New() + b.bitmaps[key] = bm + } + bm.Add(low) +} + +// Contains checks if a position is set. +func (b *RoaringPositionBitmap) Contains(pos int64) bool { + if pos < 0 { + return false + } + key := uint32(pos >> 32) + low := uint32(pos) + bm, ok := b.bitmaps[key] + if !ok { + return false + } + + return bm.Contains(low) +} + +// IsEmpty returns true if no positions are set. +func (b *RoaringPositionBitmap) IsEmpty() bool { + return b.Cardinality() == 0 +} + +// Cardinality returns the total number of set positions. +func (b *RoaringPositionBitmap) Cardinality() int64 { + var c int64 + for _, bm := range b.bitmaps { + c += int64(bm.GetCardinality()) + } + + return c +} + +// Serialize writes in the Iceberg portable format (little-endian): +// - bitmap count (8 bytes, LE): number of non-empty bitmaps +// - for each bitmap in ascending key order: key (4 bytes, LE) + roaring portable data +// +// Only non-empty bitmaps are written, matching Java Iceberg behavior. +func (b *RoaringPositionBitmap) Serialize(w io.Writer) error { + keys := make([]uint32, 0, len(b.bitmaps)) + for k, bm := range b.bitmaps { + if bm.GetCardinality() > 0 { + keys = append(keys, k) + } + } + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + + if err := binary.Write(w, binary.LittleEndian, int64(len(keys))); err != nil { + return fmt.Errorf("write bitmap count: %w", err) + } + for _, key := range keys { + if err := binary.Write(w, binary.LittleEndian, key); err != nil { + return fmt.Errorf("write key %d: %w", key, err) + } + if _, err := b.bitmaps[key].WriteTo(w); err != nil { + return fmt.Errorf("write bitmap %d: %w", key, err) + } + } + + return nil +} + +// DeserializeRoaringPositionBitmap reads a bitmap from the Iceberg portable format. +// Format: [count] { [key][bitmap] } .....{[key_n][bitmap_n]} +func DeserializeRoaringPositionBitmap(r io.Reader) (*RoaringPositionBitmap, error) { + var count int64 + if err := binary.Read(r, binary.LittleEndian, &count); err != nil { + return nil, fmt.Errorf("read bitmap count: %w", err) + } + if count < 0 { + return nil, fmt.Errorf("invalid bitmap count: %d", count) + } + if count > maxBitmapCount { + return nil, fmt.Errorf("bitmap count %d exceeds maximum allowed %d", count, maxBitmapCount) + } + + b := &RoaringPositionBitmap{ + bitmaps: make(map[uint32]*roaring.Bitmap, count), + } + var lastKey uint32 + hasLastKey := false + + for i := int64(0); i < count; i++ { + var key uint32 + if err := binary.Read(r, binary.LittleEndian, &key); err != nil { + return nil, fmt.Errorf("read key %d: %w", i, err) + } + if hasLastKey && key <= lastKey { + return nil, fmt.Errorf("keys must be ascending: got %d after %d", key, lastKey) + } + + bm := roaring.New() + if _, err := bm.ReadFrom(r); err != nil { + return nil, fmt.Errorf("read bitmap for key %d: %w", key, err) + } + b.bitmaps[key] = bm + lastKey = key + hasLastKey = true + } + + return b, nil +} + +// sortedKeys returns the bitmap keys in ascending order. +func (b *RoaringPositionBitmap) sortedKeys() []uint32 { + keys := make([]uint32, 0, len(b.bitmaps)) + for k := range b.bitmaps { + keys = append(keys, k) + } + slices.Sort(keys) + + return keys Review Comment: ```suggestion return slices.Sorted(maps.Keys(b.bitmaps)) ``` ########## table/dv/roaring_bitmap.go: ########## @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dv + +import ( + "encoding/binary" + "fmt" + "io" + "slices" + "sort" + + "github.com/RoaringBitmap/roaring/v2" + "github.com/apache/iceberg-go/puffin" +) + +// maxBitmapCount is the maximum number of 32-bit bitmap keys allowed during +// deserialization. This prevents CPU/memory exhaustion from absurd counts +// in malformed input. Derived from puffin.DefaultMaxBlobSize / 8 (minimum +// per-bitmap overhead: 4-byte key + at least 4 bytes of roaring data). +var maxBitmapCount = int64(puffin.DefaultMaxBlobSize / 8) + +// RoaringPositionBitmap supports 64-bit positions using a sparse map of +// 32-bit Roaring bitmaps. Positions are split into a 32-bit key +// (high bits) and 32-bit value (low bits). +// +// Compatible with the Java Iceberg RoaringPositionBitmap serialization format. +type RoaringPositionBitmap struct { + bitmaps map[uint32]*roaring.Bitmap +} + +// NewRoaringPositionBitmap creates an empty bitmap. +func NewRoaringPositionBitmap() *RoaringPositionBitmap { + return &RoaringPositionBitmap{ + bitmaps: make(map[uint32]*roaring.Bitmap), + } +} + +// Set marks a position in the bitmap. +// Position must be non-negative. +func (b *RoaringPositionBitmap) Set(pos int64) { + if pos < 0 { + return + } + key := uint32(pos >> 32) + low := uint32(pos) + bm, ok := b.bitmaps[key] + if !ok { + bm = roaring.New() + b.bitmaps[key] = bm + } + bm.Add(low) +} + +// Contains checks if a position is set. +func (b *RoaringPositionBitmap) Contains(pos int64) bool { + if pos < 0 { + return false + } + key := uint32(pos >> 32) + low := uint32(pos) + bm, ok := b.bitmaps[key] + if !ok { + return false + } + + return bm.Contains(low) +} + +// IsEmpty returns true if no positions are set. +func (b *RoaringPositionBitmap) IsEmpty() bool { + return b.Cardinality() == 0 +} + +// Cardinality returns the total number of set positions. +func (b *RoaringPositionBitmap) Cardinality() int64 { + var c int64 + for _, bm := range b.bitmaps { + c += int64(bm.GetCardinality()) + } + + return c +} + +// Serialize writes in the Iceberg portable format (little-endian): +// - bitmap count (8 bytes, LE): number of non-empty bitmaps +// - for each bitmap in ascending key order: key (4 bytes, LE) + roaring portable data +// +// Only non-empty bitmaps are written, matching Java Iceberg behavior. +func (b *RoaringPositionBitmap) Serialize(w io.Writer) error { + keys := make([]uint32, 0, len(b.bitmaps)) + for k, bm := range b.bitmaps { + if bm.GetCardinality() > 0 { + keys = append(keys, k) + } + } + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + + if err := binary.Write(w, binary.LittleEndian, int64(len(keys))); err != nil { + return fmt.Errorf("write bitmap count: %w", err) + } + for _, key := range keys { + if err := binary.Write(w, binary.LittleEndian, key); err != nil { + return fmt.Errorf("write key %d: %w", key, err) + } + if _, err := b.bitmaps[key].WriteTo(w); err != nil { + return fmt.Errorf("write bitmap %d: %w", key, err) + } + } + + return nil +} + +// DeserializeRoaringPositionBitmap reads a bitmap from the Iceberg portable format. +// Format: [count] { [key][bitmap] } .....{[key_n][bitmap_n]} +func DeserializeRoaringPositionBitmap(r io.Reader) (*RoaringPositionBitmap, error) { + var count int64 + if err := binary.Read(r, binary.LittleEndian, &count); err != nil { + return nil, fmt.Errorf("read bitmap count: %w", err) + } + if count < 0 { + return nil, fmt.Errorf("invalid bitmap count: %d", count) + } + if count > maxBitmapCount { + return nil, fmt.Errorf("bitmap count %d exceeds maximum allowed %d", count, maxBitmapCount) + } + + b := &RoaringPositionBitmap{ + bitmaps: make(map[uint32]*roaring.Bitmap, count), + } + var lastKey uint32 + hasLastKey := false + + for i := int64(0); i < count; i++ { Review Comment: ```suggestion for i := range count { ``` ########## manifest.go: ########## @@ -2123,10 +2126,10 @@ func NewDataFileBuilder( return nil, fmt.Errorf("%w: path cannot be empty", ErrInvalidArgument) } - if format != AvroFile && format != OrcFile && format != ParquetFile { + if format != AvroFile && format != OrcFile && format != ParquetFile && format != PuffinFile { Review Comment: Agreed ########## table/dv/roaring_bitmap.go: ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "encoding/binary" + "fmt" + "io" + + "github.com/RoaringBitmap/roaring/v2" +) + +// RoaringPositionBitmap supports 64-bit positions using an array of +// 32-bit Roaring bitmaps. Positions are split into a 32-bit key +// (high bits) and 32-bit value (low bits). +// +// Compatible with the Java Iceberg RoaringPositionBitmap serialization format. +type RoaringPositionBitmap struct { + bitmaps []*roaring.Bitmap // index = high 32 bits (key) +} + +// NewRoaringPositionBitmap creates an empty bitmap. +func NewRoaringPositionBitmap() *RoaringPositionBitmap { + return &RoaringPositionBitmap{} +} + +// Set marks a position in the bitmap. +func (b *RoaringPositionBitmap) Set(pos int64) { + key := int(pos >> 32) + low := uint32(pos) + b.grow(key + 1) + b.bitmaps[key].Add(low) +} + +// Contains checks if a position is set. +func (b *RoaringPositionBitmap) Contains(pos int64) bool { + key := int(pos >> 32) + low := uint32(pos) + if key >= len(b.bitmaps) { + return false + } + + return b.bitmaps[key].Contains(low) +} + +// IsEmpty returns true if no positions are set. +func (b *RoaringPositionBitmap) IsEmpty() bool { + return b.Cardinality() == 0 +} + +// Cardinality returns the total number of set positions. +func (b *RoaringPositionBitmap) Cardinality() int64 { + var c int64 + for _, bm := range b.bitmaps { + c += int64(bm.GetCardinality()) + } + + return c +} + +// Serialize writes in the Iceberg portable format (little-endian): +// - bitmap count (8 bytes, LE) +// - for each bitmap: key (4 bytes, LE) + roaring portable data +func (b *RoaringPositionBitmap) Serialize(w io.Writer) error { Review Comment: Agreed ########## table/dv/deletion_vector.go: ########## @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dv + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash/crc32" + + "github.com/apache/iceberg-go" + iceio "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/puffin" +) + +const ( + // DVMagicNumber is the magic number for deletion vectors. + // Spec bytes: D1 D3 39 64 (big-endian) = 0x6439D3D1 (little-endian uint32) + DVMagicNumber uint32 = 0x6439D3D1 + + dvLengthSize = 4 // length field + dvMagicSize = 4 // magic field + dvCRCSize = 4 // CRC-32 checksum + dvMinSize = dvLengthSize + dvMagicSize + dvCRCSize +) + +// DeserializeDV parses a deletion vector blob and returns a bitmap of deleted positions. +// +// The DV binary format is: +// - Length (4 bytes, big-endian): size of magic + bitmap data Review Comment: does the length include the CRC32 bytes also? ########## table/dv/deletion_vector.go: ########## @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dv + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash/crc32" + + "github.com/apache/iceberg-go" + iceio "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/puffin" +) + +const ( + // DVMagicNumber is the magic number for deletion vectors. + // Spec bytes: D1 D3 39 64 (big-endian) = 0x6439D3D1 (little-endian uint32) + DVMagicNumber uint32 = 0x6439D3D1 + + dvLengthSize = 4 // length field + dvMagicSize = 4 // magic field + dvCRCSize = 4 // CRC-32 checksum + dvMinSize = dvLengthSize + dvMagicSize + dvCRCSize +) + +// DeserializeDV parses a deletion vector blob and returns a bitmap of deleted positions. +// +// The DV binary format is: +// - Length (4 bytes, big-endian): size of magic + bitmap data +// - Magic (4 bytes, little-endian): must be 0x6439D3D1 +// - Bitmap (variable): roaring bitmap in Iceberg portable format +// - CRC-32 (4 bytes, big-endian): checksum over magic + bitmap Review Comment: it's super weird that the spec jumps back and forth between big and little endian representations.... ########## table/dv/roaring_bitmap.go: ########## @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dv + +import ( + "encoding/binary" + "fmt" + "io" + "slices" + "sort" + + "github.com/RoaringBitmap/roaring/v2" + "github.com/apache/iceberg-go/puffin" +) + +// maxBitmapCount is the maximum number of 32-bit bitmap keys allowed during +// deserialization. This prevents CPU/memory exhaustion from absurd counts +// in malformed input. Derived from puffin.DefaultMaxBlobSize / 8 (minimum +// per-bitmap overhead: 4-byte key + at least 4 bytes of roaring data). +var maxBitmapCount = int64(puffin.DefaultMaxBlobSize / 8) + +// RoaringPositionBitmap supports 64-bit positions using a sparse map of +// 32-bit Roaring bitmaps. Positions are split into a 32-bit key +// (high bits) and 32-bit value (low bits). +// +// Compatible with the Java Iceberg RoaringPositionBitmap serialization format. +type RoaringPositionBitmap struct { + bitmaps map[uint32]*roaring.Bitmap +} + +// NewRoaringPositionBitmap creates an empty bitmap. +func NewRoaringPositionBitmap() *RoaringPositionBitmap { + return &RoaringPositionBitmap{ + bitmaps: make(map[uint32]*roaring.Bitmap), + } +} + +// Set marks a position in the bitmap. +// Position must be non-negative. +func (b *RoaringPositionBitmap) Set(pos int64) { + if pos < 0 { + return + } + key := uint32(pos >> 32) + low := uint32(pos) + bm, ok := b.bitmaps[key] + if !ok { + bm = roaring.New() + b.bitmaps[key] = bm + } + bm.Add(low) +} + +// Contains checks if a position is set. +func (b *RoaringPositionBitmap) Contains(pos int64) bool { + if pos < 0 { + return false + } + key := uint32(pos >> 32) + low := uint32(pos) + bm, ok := b.bitmaps[key] + if !ok { + return false + } + + return bm.Contains(low) +} + +// IsEmpty returns true if no positions are set. +func (b *RoaringPositionBitmap) IsEmpty() bool { + return b.Cardinality() == 0 +} + +// Cardinality returns the total number of set positions. +func (b *RoaringPositionBitmap) Cardinality() int64 { + var c int64 + for _, bm := range b.bitmaps { + c += int64(bm.GetCardinality()) + } + + return c +} + +// Serialize writes in the Iceberg portable format (little-endian): +// - bitmap count (8 bytes, LE): number of non-empty bitmaps +// - for each bitmap in ascending key order: key (4 bytes, LE) + roaring portable data +// +// Only non-empty bitmaps are written, matching Java Iceberg behavior. +func (b *RoaringPositionBitmap) Serialize(w io.Writer) error { + keys := make([]uint32, 0, len(b.bitmaps)) + for k, bm := range b.bitmaps { + if bm.GetCardinality() > 0 { + keys = append(keys, k) + } + } + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + + if err := binary.Write(w, binary.LittleEndian, int64(len(keys))); err != nil { + return fmt.Errorf("write bitmap count: %w", err) + } + for _, key := range keys { + if err := binary.Write(w, binary.LittleEndian, key); err != nil { + return fmt.Errorf("write key %d: %w", key, err) + } + if _, err := b.bitmaps[key].WriteTo(w); err != nil { + return fmt.Errorf("write bitmap %d: %w", key, err) + } + } + + return nil +} + +// DeserializeRoaringPositionBitmap reads a bitmap from the Iceberg portable format. +// Format: [count] { [key][bitmap] } .....{[key_n][bitmap_n]} +func DeserializeRoaringPositionBitmap(r io.Reader) (*RoaringPositionBitmap, error) { Review Comment: If the only current use for this always going to have the whole byte slice and then have to wrap it with `bytes.NewReader` why not just take a byte slice here instead and avoid the trip through `io.Reader`? ########## table/dv/roaring_bitmap.go: ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "encoding/binary" + "fmt" + "io" + + "github.com/RoaringBitmap/roaring/v2" +) + +// RoaringPositionBitmap supports 64-bit positions using an array of +// 32-bit Roaring bitmaps. Positions are split into a 32-bit key +// (high bits) and 32-bit value (low bits). +// +// Compatible with the Java Iceberg RoaringPositionBitmap serialization format. +type RoaringPositionBitmap struct { + bitmaps []*roaring.Bitmap // index = high 32 bits (key) +} + +// NewRoaringPositionBitmap creates an empty bitmap. +func NewRoaringPositionBitmap() *RoaringPositionBitmap { + return &RoaringPositionBitmap{} +} + +// Set marks a position in the bitmap. +func (b *RoaringPositionBitmap) Set(pos int64) { + key := int(pos >> 32) + low := uint32(pos) + b.grow(key + 1) + b.bitmaps[key].Add(low) +} + +// Contains checks if a position is set. +func (b *RoaringPositionBitmap) Contains(pos int64) bool { Review Comment: Same question, can we just use `uint64` to alleviate the need for the check? ########## table/dv/roaring_bitmap.go: ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "encoding/binary" + "fmt" + "io" + + "github.com/RoaringBitmap/roaring/v2" +) + +// RoaringPositionBitmap supports 64-bit positions using an array of +// 32-bit Roaring bitmaps. Positions are split into a 32-bit key +// (high bits) and 32-bit value (low bits). +// +// Compatible with the Java Iceberg RoaringPositionBitmap serialization format. +type RoaringPositionBitmap struct { + bitmaps []*roaring.Bitmap // index = high 32 bits (key) +} + +// NewRoaringPositionBitmap creates an empty bitmap. +func NewRoaringPositionBitmap() *RoaringPositionBitmap { + return &RoaringPositionBitmap{} +} + +// Set marks a position in the bitmap. +func (b *RoaringPositionBitmap) Set(pos int64) { Review Comment: any reason we can't just use `uint64` to alleviate the need for the check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
