This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 87c1958f feat(table): scanner deletion-vector read path (#1053)
87c1958f is described below
commit 87c1958ffac77956e00cfdf868dc7916dc51aed9
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Tue May 26 18:47:26 2026 +0200
feat(table): scanner deletion-vector read path (#1053)
Closes #996. Drops the ErrNotImplemented guard in GetRecords and wires
DVs into the same per-file delete map Parquet pos-deletes already feed —
recordsFromTask sees one set[int64] either way. Dedup is keyed by
referenced data file (one puffin file can carry multiple blobs);
distinct DVs for the same data file are rejected. PlanFiles also stops
pairing pos-delete files with a data file when a DV applies, per spec.
Tests build the puffin in-process so this isn't blocked on #1041.
Adjacent gaps deferred to follow-ups:
- #1048 (puffin DV header check)
- #1049 (ReadDV cardinality)
- #1050 (PlanFiles DV seq + multi-DV reject)
- #1051 (GetRecords arrow leak)
- #1052 (end-to-end DV scan test).
---
table/arrow_scanner.go | 233 ++++++++++++++++++++++++----
table/dv/roaring_bitmap.go | 85 ++++++++++-
table/dv/roaring_bitmap_test.go | 159 ++++++++++++++++++-
table/dv_scanner_read_test.go | 329 ++++++++++++++++++++++++++++++++++++++++
table/scanner.go | 18 ++-
5 files changed, 784 insertions(+), 40 deletions(-)
diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
index 259d5ecd..968201d9 100644
--- a/table/arrow_scanner.go
+++ b/table/arrow_scanner.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/iceberg-go"
iceinternal "github.com/apache/iceberg-go/internal"
iceio "github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table/dv"
"github.com/apache/iceberg-go/table/internal"
"github.com/apache/iceberg-go/table/substrait"
"github.com/substrait-io/substrait-go/v8/expr"
@@ -70,11 +71,8 @@ func releasePerFilePosDeletes(deletesPerFile
perFilePosDeletes) {
}
func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks
[]FileScanTask, concurrency int) (perFilePosDeletes, error) {
- var (
- deletesPerFile = make(perFilePosDeletes)
- uniqueDeletes = make(map[string]iceberg.DataFile)
- err error
- )
+ deletesPerFile := make(perFilePosDeletes)
+ uniqueDeletes := make(map[string]iceberg.DataFile)
for _, t := range tasks {
for _, d := range t.DeleteFiles {
@@ -92,24 +90,33 @@ func readAllDeleteFiles(ctx context.Context, fs iceio.IO,
tasks []FileScanTask,
return deletesPerFile, nil
}
- g, ctx := errgroup.WithContext(ctx)
+ g, gctx := errgroup.WithContext(ctx)
g.SetLimit(concurrency)
perFileChan := make(chan map[string]*arrow.Chunked, concurrency)
go func() {
+ // Inner g.Wait() gates the deferred channel close so a late
worker
+ // can't send on a closed channel. Outer g.Wait() below
collects the
+ // error — no cross-goroutine shared err variable.
defer close(perFileChan)
for _, v := range uniqueDeletes {
g.Go(func() error {
- deletes, err := readDeletes(ctx, fs, v)
- if deletes != nil {
- perFileChan <- deletes
+ deletes, err := readDeletes(gctx, fs, v)
+ if err != nil {
+ return err
+ }
+ if deletes == nil {
+ return nil
+ }
+ select {
+ case perFileChan <- deletes:
+ return nil
+ case <-gctx.Done():
+ return gctx.Err()
}
-
- return err
})
}
-
- err = g.Wait()
+ _ = g.Wait()
}()
for deletes := range perFileChan {
@@ -118,7 +125,130 @@ func readAllDeleteFiles(ctx context.Context, fs iceio.IO,
tasks []FileScanTask,
}
}
- return deletesPerFile, err
+ if err := g.Wait(); err != nil {
+ return nil, err
+ }
+
+ return deletesPerFile, nil
+}
+
+// perFileDVBitmaps maps each data-file path to the deletion-vector bitmap
+// that applies to it. Kept separate from perFilePosDeletes so the row-filter
+// pipeline can use compute.Filter on a Boolean mask built from Contains()
+// directly, instead of materializing positions into a set[int64] + Take.
+type perFileDVBitmaps = map[string]*dv.RoaringPositionBitmap
+
+// readAllDeletionVectors reads every deletion-vector puffin blob referenced
+// by the input tasks and returns a perFileDVBitmaps map keyed by the
+// referenced data-file path.
+//
+// Dedup is by referenced-data-file path, not by puffin file path: a single
+// puffin file can carry multiple DV blobs (one per data file). Keying by the
+// puffin path would silently drop all but the first blob. This matches Java's
+// DeleteFileIndex.findDV, which keys by data-file path. As a side-effect we
+// can detect spec violations: two distinct DV blobs targeting the same data
+// file is rejected (mirrors Java's "Can't index multiple DVs for %s"
+// ValidationException — over-deletion risk if silently unioned).
+//
+// Validation happens up front, before any goroutines are launched, so the
+// goroutine fan-out has no early-exit path. (An early return after g.Go
+// dispatches but before g.Wait would close resultsChan while in-flight
+// workers were still sending, panicking with "send on closed channel".)
+func readAllDeletionVectors(ctx context.Context, fs iceio.IO, tasks
[]FileScanTask, concurrency int) (perFileDVBitmaps, error) {
+ out := make(perFileDVBitmaps)
+ uniqueDVs := make(map[string]iceberg.DataFile)
+
+ for _, t := range tasks {
+ for _, d := range t.DeletionVectorFiles {
+ ref := d.ReferencedDataFile()
+ if ref == nil {
+ return nil, fmt.Errorf("deletion vector %s
missing referenced_data_file", d.FilePath())
+ }
+ if d.ContentOffset() == nil || d.ContentSizeInBytes()
== nil {
+ // Spec §Manifest Files: content_offset and
content_size_in_
+ // bytes are required for DV entries. Surface
the missing-
+ // field cause directly here — otherwise the
dedup check
+ // below would produce the misleading "multiple
deletion
+ // vectors" error when two equally-broken
entries collide.
+ return nil, fmt.Errorf("deletion vector %s
missing content_offset/content_size_in_bytes", d.FilePath())
+ }
+ if existing, seen := uniqueDVs[*ref]; seen {
+ if !sameDVBlob(existing, d) {
+ return nil, fmt.Errorf(
+ "multiple deletion vectors for
data file %s: %s and %s",
+ *ref, existing.FilePath(),
d.FilePath())
+ }
+
+ continue
+ }
+ uniqueDVs[*ref] = d
+ }
+ }
+
+ if len(uniqueDVs) == 0 {
+ return out, nil
+ }
+
+ type dvResult struct {
+ referencedDataFile string
+ bitmap *dv.RoaringPositionBitmap
+ }
+
+ g, gctx := errgroup.WithContext(ctx)
+ g.SetLimit(concurrency)
+
+ resultsChan := make(chan dvResult, concurrency)
+ go func() {
+ // g.Wait() before the deferred close so workers finish sending
+ // before the channel is closed — otherwise a late worker would
+ // panic on send to a closed channel. The error is collected via
+ // the outer g.Wait() below, not stored on a shared variable.
+ defer close(resultsChan)
+ for ref, dvFile := range uniqueDVs {
+ g.Go(func() error {
+ bitmap, err := dv.ReadDV(fs, dvFile)
+ if err != nil {
+ return fmt.Errorf("read deletion vector
%s: %w", dvFile.FilePath(), err)
+ }
+ select {
+ case resultsChan <-
dvResult{referencedDataFile: ref, bitmap: bitmap}:
+ return nil
+ case <-gctx.Done():
+ return gctx.Err()
+ }
+ })
+ }
+ _ = g.Wait()
+ }()
+
+ for r := range resultsChan {
+ out[r.referencedDataFile] = r.bitmap
+ }
+
+ if err := g.Wait(); err != nil {
+ return nil, err
+ }
+
+ return out, nil
+}
+
+// sameDVBlob reports whether two DV manifest entries point at the same puffin
+// blob — identical puffin file path and content offset. Different of either
+// means two distinct DVs for the same data file, the over-deletion case
+// readAllDeletionVectors rejects.
+//
+// Java's DeleteFileIndex is stricter: any second DV for the same data file is
+// rejected with ValidationException("Can't index multiple DVs for %s"), even
+// when both entries reference the same underlying blob. Same-blob dedup here
+// is a deliberate divergence — reading the same blob twice is wasteful, not
+// incorrect. ContentOffset is required to be non-nil by the spec and by the
+// pre-pass in readAllDeletionVectors, so the comparison below assumes both.
+func sameDVBlob(a, b iceberg.DataFile) bool {
+ if a.FilePath() != b.FilePath() {
+ return false
+ }
+
+ return *a.ContentOffset() == *b.ContentOffset()
}
func readDeletes(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile)
(_ map[string]*arrow.Chunked, err error) {
@@ -211,6 +341,43 @@ func processPositionalDeletes(ctx context.Context, deletes
set[int64]) recProces
}
}
+// filterByDeletionVector returns a pipeline step that drops rows present in
+// the bitmap by precomputing a bit-packed keep-mask covering the whole file
+// once and slicing the relevant range per batch into
compute.FilterRecordBatch.
+//
+// The mask layout matches Arrow's Boolean buffer convention (LSB-first per
+// byte, little-endian word order), so dv.KeepMaskBytes ->
memory.NewBufferBytes
+// -> array.NewBoolean is a zero-copy wrap. The keepBits slice is Go-allocated
+// (GC-friendly) and shared across every per-batch Boolean array; each
+// array.NewBoolean / array.NewSlice pair is released after the batch.
+//
+// rowCount bounds the mask to the data file's row count. The closure-captured
+// nextIdx tracks absolute position across batches, mirroring
+// processPositionalDeletes.
+func filterByDeletionVector(ctx context.Context, bitmap
*dv.RoaringPositionBitmap, rowCount int64) recProcessFn {
+ nextIdx := int64(0)
+ keepBits := bitmap.KeepMaskBytes(rowCount)
+ buf := memory.NewBufferBytes(keepBits)
+
+ return func(r arrow.RecordBatch) (arrow.RecordBatch, error) {
+ defer r.Release()
+
+ currentIdx := nextIdx
+ nextIdx += r.NumRows()
+
+ // Wrap (and slice) the shared keep-mask buffer for this batch.
+ // array.NewSlice on a Boolean array tracks the bit-level
offset,
+ // so we don't need byte-aligned slicing — currentIdx can land
+ // anywhere within a byte.
+ full := array.NewBoolean(int(rowCount), buf, nil, 0)
+ defer full.Release()
+ sliced := array.NewSlice(full, currentIdx,
nextIdx).(*array.Boolean)
+ defer sliced.Release()
+
+ return compute.FilterRecordBatch(ctx, r, sliced,
compute.DefaultFilterOptions())
+ }
+}
+
// enrichRecordsWithPosDeleteFields enriches a RecordBatch with the columns
declared in the PositionalDeleteArrowSchema
// so that during the pipeline filtering stages that sheds filtered out
records, we still have a way to
// preserve the original position of those records.
@@ -561,7 +728,7 @@ func (as *arrowScan) processRecords(
return err
}
-func (as *arrowScan) recordsFromTask(ctx context.Context, task
internal.Enumerated[FileScanTask], out chan<- enumeratedRecord,
positionalDeletes positionDeletes, eqDeleteSets []*equalityDeleteSet) (err
error) {
+func (as *arrowScan) recordsFromTask(ctx context.Context, task
internal.Enumerated[FileScanTask], out chan<- enumeratedRecord,
positionalDeletes positionDeletes, dvBitmap *dv.RoaringPositionBitmap,
eqDeleteSets []*equalityDeleteSet) (err error) {
defer func() {
if err != nil {
out <- enumeratedRecord{Task: task, Err: err}
@@ -582,7 +749,7 @@ func (as *arrowScan) recordsFromTask(ctx context.Context,
task internal.Enumerat
}
defer iceinternal.CheckedClose(rdr, &err)
- pipeline := make([]recProcessFn, 0, 2)
+ pipeline := make([]recProcessFn, 0, 3)
if len(positionalDeletes) > 0 {
deletes := set[int64]{}
for _, chunk := range positionalDeletes {
@@ -596,6 +763,14 @@ func (as *arrowScan) recordsFromTask(ctx context.Context,
task internal.Enumerat
pipeline = append(pipeline, processPositionalDeletes(ctx,
deletes))
}
+ // PlanFiles skips Parquet pos-delete matching for any data file that
+ // has a DV (per spec), so in practice the two are mutually exclusive
+ // per task. Append after the pos-delete step anyway so a manually-
+ // constructed task with both sources still gets both filters applied.
+ if dvBitmap != nil && !dvBitmap.IsEmpty() {
+ pipeline = append(pipeline, filterByDeletionVector(ctx,
dvBitmap, task.Value.File.Count()))
+ }
+
if len(eqDeleteSets) > 0 {
eqFn, eqErr := processEqualityDeletes(ctx, eqDeleteSets)
if eqErr != nil {
@@ -822,7 +997,7 @@ func createIterator(ctx context.Context, numWorkers uint,
records <-chan enumera
}
}
-func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context,
tasks []FileScanTask, deletesPerFile perFilePosDeletes, eqDeleteSets
map[int][]*equalityDeleteSet) iter.Seq2[arrow.RecordBatch, error] {
+func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context,
tasks []FileScanTask, deletesPerFile perFilePosDeletes, dvBitmaps
perFileDVBitmaps, eqDeleteSets map[int][]*equalityDeleteSet)
iter.Seq2[arrow.RecordBatch, error] {
extSet := substrait.NewExtensionSet()
as.nameMapping = as.metadata.NameMapping()
@@ -847,8 +1022,10 @@ func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx
context.Context, tasks
return
}
+ filePath := task.Value.File.FilePath()
if err := as.recordsFromTask(ctx, task,
records,
-
deletesPerFile[task.Value.File.FilePath()],
+ deletesPerFile[filePath],
+ dvBitmaps[filePath],
eqDeleteSets[task.Index]); err
!= nil {
cancel(err)
@@ -876,14 +1053,6 @@ func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx
context.Context, tasks
}
func (as *arrowScan) GetRecords(ctx context.Context, tasks []FileScanTask)
(*arrow.Schema, iter.Seq2[arrow.RecordBatch, error], error) {
- for _, t := range tasks {
- if len(t.DeletionVectorFiles) > 0 {
- return nil, nil, fmt.Errorf(
- "%w: deletion vector read is not yet
implemented, data file: %s has %d deletion vector(s)",
- iceberg.ErrNotImplemented, t.File.FilePath(),
len(t.DeletionVectorFiles))
- }
- }
-
var err error
as.useLargeTypes, err =
strconv.ParseBool(as.options.Get(ScanOptionArrowUseLargeTypes, "false"))
if err != nil {
@@ -910,6 +1079,16 @@ func (as *arrowScan) GetRecords(ctx context.Context,
tasks []FileScanTask) (*arr
return nil, nil, err
}
+ // DV bitmaps stay in their native form rather than being materialized
+ // into int64 positions and merged with the Parquet pos-delete map.
+ // filterByDeletionVector applies the bitmap to each batch via a Boolean
+ // keep-mask + compute.Filter — O(1) Contains lookups, vectorized
Filter,
+ // no intermediate position set.
+ dvBitmaps, err := readAllDeletionVectors(ctx, as.fs, tasks,
as.concurrency)
+ if err != nil {
+ return nil, nil, err
+ }
+
eqDeleteSets, err := readAllEqualityDeleteFiles(ctx, as.fs,
as.metadata.CurrentSchema(), tasks, as.concurrency)
if err != nil {
@@ -919,5 +1098,5 @@ func (as *arrowScan) GetRecords(ctx context.Context, tasks
[]FileScanTask) (*arr
return nil, nil, err
}
- return resultSchema, as.recordBatchesFromTasksAndDeletes(ctx, tasks,
deletesPerFile, eqDeleteSets), nil
+ return resultSchema, as.recordBatchesFromTasksAndDeletes(ctx, tasks,
deletesPerFile, dvBitmaps, eqDeleteSets), nil
}
diff --git a/table/dv/roaring_bitmap.go b/table/dv/roaring_bitmap.go
index cb25844a..a51ddc51 100644
--- a/table/dv/roaring_bitmap.go
+++ b/table/dv/roaring_bitmap.go
@@ -22,11 +22,11 @@ import (
"encoding/binary"
"fmt"
"io"
- "maps"
- "slices"
"sort"
"github.com/RoaringBitmap/roaring/v2"
+ "github.com/apache/arrow-go/v18/arrow/bitutil"
+ "github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/iceberg-go/puffin"
)
@@ -76,7 +76,11 @@ func (b *RoaringPositionBitmap) Contains(pos uint64) bool {
return bm.Contains(low)
}
-// IsEmpty returns true if no positions are set.
+// IsEmpty returns true if no positions are set. Returns true both for the
+// no-bucket case and for the (currently impossible-via-public-API) case
+// where a bucket exists but its inner roaring bitmap has zero cardinality
+// — the latter only matters if a future Remove-style method ever lets a
+// bucket drop to empty without being deleted from the map.
func (b *RoaringPositionBitmap) IsEmpty() bool {
return b.Cardinality() == 0
}
@@ -160,7 +164,76 @@ func DeserializeRoaringPositionBitmap(data []byte)
(*RoaringPositionBitmap, erro
return b, nil
}
-// sortedKeys returns the bitmap keys in ascending order.
-func (b *RoaringPositionBitmap) sortedKeys() []uint32 {
- return slices.Sorted(maps.Keys(b.bitmaps))
+// KeepMaskBytes returns a bit-packed []byte of length ⌈length/8⌉ where bit i
+// (LSB-first within a byte) is 1 iff position i is NOT in the bitmap. The
+// layout matches Arrow Boolean buffer convention so callers can wrap the
+// result via memory.NewBufferBytes / array.NewBoolean without re-packing.
+//
+// length bounds the range of positions the caller cares about — typically the
+// data file's row count. Bits past length-1 in the final byte are cleared.
+// Positions in the bitmap that fall outside [0, length) are ignored, so a
+// caller can safely pass a length smaller than the bitmap's max position
+// (e.g. when the file row count is below a stale upper bound).
+//
+// Bucket-key arithmetic is exact: each 32-bit bucket covers exactly 2^32
+// positions, so per-bucket bit offsets are 8-byte-aligned and the writer can
+// pack inverted dense words straight in. bitutil.BitmapWordWriter handles
+// host-endianness internally (PutNextWord LE-packs regardless of platform),
+// so the helper is portable on any GOARCH.
+func (b *RoaringPositionBitmap) KeepMaskBytes(length int64) []byte {
+ if length <= 0 {
+ return nil
+ }
+ out := make([]byte, bitutil.BytesForBits(length))
+ // Pre-fill ALL bits to 1 (keep) before any bucket write.
BitmapWordWriter
+ // does full-word stores (not read-modify-write), so the un-deleted bits
+ // in each 8-byte word land at 1 only because this initialization
+ // happened. A future refactor that moves bucket writes before the
+ // memory.Set, or removes it entirely, would silently corrupt the mask.
+ memory.Set(out, 0xFF)
+
+ for key, bm := range b.bitmaps {
+ bucketBitBase := int64(key) << 32
+ if bucketBitBase >= length {
+ continue
+ }
+ dense := bm.ToDense()
+ if len(dense) == 0 {
+ continue
+ }
+ // Cap the bucket's bit range to what fits in `length`.
+ bucketBits := int64(len(dense)) * 64
+ if bucketBitBase+bucketBits > length {
+ bucketBits = length - bucketBitBase
+ }
+ // bucketBitBase = key << 32 is always 8-byte-aligned, so the
+ // BitmapWordWriter runs with offset=0 internally. The
trailing-byte
+ // loop below relies on that alignment — PutNextTrailingByte's
+ // validBits=8 path is byte-aligned only when offset == 0.
+ wr := bitutil.NewBitmapWordWriter(out, int(bucketBitBase),
int(bucketBits))
+ full := int(bucketBits / 64)
+ for i := 0; i < full; i++ {
+ wr.PutNextWord(^dense[i])
+ }
+ if rem := int(bucketBits & 63); rem > 0 {
+ last := ^dense[full]
+ for rem > 0 {
+ valid := 8
+ if rem < 8 {
+ valid = rem
+ }
+ wr.PutNextTrailingByte(byte(last), valid)
+ last >>= 8
+ rem -= valid
+ }
+ }
+ }
+ // Pad bits past length-1 in the trailing byte must be 0; the writer can
+ // paint into them when bucketBits stops mid-byte, so the clear has to
+ // happen after all bucket writes complete.
+ if pad := int64(len(out))*8 - length; pad > 0 {
+ bitutil.SetBitsTo(out, length, pad, false)
+ }
+
+ return out
}
diff --git a/table/dv/roaring_bitmap_test.go b/table/dv/roaring_bitmap_test.go
index f060b7d4..bccc7e9c 100644
--- a/table/dv/roaring_bitmap_test.go
+++ b/table/dv/roaring_bitmap_test.go
@@ -22,6 +22,7 @@ import (
"encoding/binary"
"testing"
+ "github.com/RoaringBitmap/roaring/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -147,9 +148,161 @@ func TestDeserializeRoaringBitmapTruncatedAfterKey(t
*testing.T) {
assert.ErrorContains(t, err, "read bitmap for key 0")
}
-// Why: Set, Contains, Cardinality, and gap handling are the core in-memory
behaviors of this type.
-// Condition: set positions across keys 0, 1, and 3, leaving key 2 absent.
-// Assertion: cardinality counts all set positions, expected positions are
present, and unset positions in the same key, a gap key, and a far key are
absent.
+// TestKeepMaskBytes pins KeepMaskBytes' contract: a bit-packed []byte where
+// bit i is 1 iff position i is NOT in the bitmap, sized to ⌈length/8⌉ bytes,
+// matching Arrow Boolean buffer layout so it can be wrapped without
re-packing.
+func TestKeepMaskBytes(t *testing.T) {
+ // bitAt returns the LSB-first bit at position i in mask.
+ bitAt := func(mask []byte, i int64) bool {
+ return mask[i>>3]&(1<<uint(i&7)) != 0
+ }
+
+ t.Run("empty bitmap: all bits kept", func(t *testing.T) {
+ bm := NewRoaringPositionBitmap()
+ mask := bm.KeepMaskBytes(10)
+ require.Len(t, mask, 2) // ⌈10/8⌉ = 2
+ for i := int64(0); i < 10; i++ {
+ assert.True(t, bitAt(mask, i), "bit %d should be kept",
i)
+ }
+ // Bits 10..15 in the trailing byte must be cleared so callers
+ // inspecting raw bytes don't see phantom keep bits past length.
+ assert.Equal(t, byte(0b00000011), mask[1])
+ })
+
+ t.Run("single bucket: deleted bits cleared, others kept", func(t
*testing.T) {
+ bm := NewRoaringPositionBitmap()
+ bm.Set(0)
+ bm.Set(3)
+ bm.Set(7)
+ bm.Set(9)
+
+ mask := bm.KeepMaskBytes(16)
+ require.Len(t, mask, 2)
+ for _, deleted := range []int64{0, 3, 7, 9} {
+ assert.False(t, bitAt(mask, deleted), "bit %d should be
deleted", deleted)
+ }
+ for _, kept := range []int64{1, 2, 4, 5, 6, 8, 10, 11, 12, 13,
14, 15} {
+ assert.True(t, bitAt(mask, kept), "bit %d should be
kept", kept)
+ }
+ })
+
+ t.Run("length zero returns nil", func(t *testing.T) {
+ bm := NewRoaringPositionBitmap()
+ bm.Set(0)
+ assert.Nil(t, bm.KeepMaskBytes(0))
+ })
+
+ t.Run("length shorter than bitmap: extra positions ignored", func(t
*testing.T) {
+ // Bitmap holds a delete at position 50, but the caller asks
for only
+ // the first 32 positions. The mask must not extend to position
50,
+ // and the in-range bits must all be kept.
+ bm := NewRoaringPositionBitmap()
+ bm.Set(50)
+
+ mask := bm.KeepMaskBytes(32)
+ require.Len(t, mask, 4)
+ for i := int64(0); i < 32; i++ {
+ assert.True(t, bitAt(mask, i), "bit %d should be kept",
i)
+ }
+ })
+
+ t.Run("length not byte-aligned: trailing bits cleared", func(t
*testing.T) {
+ // Length 13 → 2 bytes, bits 13..15 in byte 1 must be 0 even
when no
+ // deletes touch them. Pins the trailing-byte mask logic.
+ bm := NewRoaringPositionBitmap()
+ mask := bm.KeepMaskBytes(13)
+ require.Len(t, mask, 2)
+ assert.Equal(t, byte(0xFF), mask[0])
+ assert.Equal(t, byte(0b00011111), mask[1])
+ })
+
+ t.Run("multi-bucket: deletes from a non-zero bucket are out of range
and ignored", func(t *testing.T) {
+ // Spec parity case: a DV bitmap with positions in bucket 1
+ // (position >= 2^32) cannot fit in a realistic caller-supplied
+ // length, since allocating ⌈length/8⌉ bytes for length >= 2^32
+ // would require >= 512 MiB. KeepMaskBytes must correctly walk
every
+ // bucket and skip those whose byte-offset already exceeds the
mask
+ // length — without this guard the bucket-1 ToDense allocation
would
+ // run anyway and (worse) try to index past out[len(out)-1].
+ bm := NewRoaringPositionBitmap()
+ bm.Set(5) // bucket 0
+ bm.Set(1<<32 | 7) // bucket 1 — out of range for length 64
+ bm.Set(3<<32 | 999999) // bucket 3 — out of range for length 64
+
+ mask := bm.KeepMaskBytes(64)
+ require.Len(t, mask, 8)
+ assert.False(t, bitAt(mask, 5))
+ for i := int64(0); i < 64; i++ {
+ if i == 5 {
+ continue
+ }
+ assert.True(t, bitAt(mask, i), "bit %d should be kept",
i)
+ }
+ })
+
+ t.Run("multi-bucket within range: bucket-1 deletes land at correct
offset", func(t *testing.T) {
+ // Construct the bitmaps map manually with a synthetic small-key
+ // "bucket 1" so the test can verify cross-bucket bit placement
+ // without allocating 512 MiB. The internal layout is exactly
the
+ // same as Set would produce for a real position >= 2^32; this
just
+ // dodges the high-position allocation. Verifies that the
per-bucket
+ // bucketBitBase = key << 32 (bit offset 2^32, i.e. byte offset
2^29)
+ // places bucket-1 deletes at byte 2^29, not at byte 0.
+ const length int64 = (1 << 32) + 16 // just past the bucket-0/1
boundary
+ bm := &RoaringPositionBitmap{bitmaps:
make(map[uint32]*roaring.Bitmap)}
+ bm.Set(5) // bucket 0
+ // Directly install a bucket-1 entry with low-bits 7 set, which
is
+ // what Set((1<<32)+7) would produce — but avoids the 512 MiB
mask.
+ bucket1 := roaring.New()
+ bucket1.Add(7)
+ bm.bitmaps[1] = bucket1
+
+ mask := bm.KeepMaskBytes(length)
+ require.Len(t, mask, int((length+7)/8))
+ assert.False(t, bitAt(mask, 5), "bucket-0 delete at position 5
must land in byte 0")
+ assert.False(t, bitAt(mask, (1<<32)+7), "bucket-1 delete at
position 2^32+7 must land at byte 2^29")
+ // Adjacent positions in each bucket should be kept.
+ assert.True(t, bitAt(mask, 4))
+ assert.True(t, bitAt(mask, 6))
+ assert.True(t, bitAt(mask, (1<<32)+6))
+ assert.True(t, bitAt(mask, (1<<32)+8))
+ })
+
+ // Pins PutNextTrailingByte's per-byte loop in KeepMaskBytes: when a
+ // bucket's bit range stops mid-word (bucketBits & 63 != 0), the loop
+ // must place delete bits inside the trailing partial word. Without
+ // coverage, an off-by-one in `last >>= 8` or `valid = min(8, rem)`
+ // would silently keep a deleted row visible.
+ for _, tc := range []struct {
+ name string
+ length int64
+ deletePos uint64
+ expectBits []int64 // additional positions that must be kept
+ }{
+ // rem = length % 64. For each rem in {1, 7, 8, 63}, place a
+ // deletion at the LAST in-range bit so the trailing loop must
+ // reach validBits = rem on its final iteration.
+ {name: "rem=1: delete at length-1", length: 65, deletePos: 64,
expectBits: []int64{0, 32, 63}},
+ {name: "rem=7: delete at length-1", length: 71, deletePos: 70,
expectBits: []int64{0, 63, 64, 69}},
+ {name: "rem=8: delete at length-1", length: 72, deletePos: 71,
expectBits: []int64{0, 64, 70}},
+ {name: "rem=63: delete at length-1", length: 127, deletePos:
126, expectBits: []int64{0, 64, 125}},
+ } {
+ t.Run(tc.name, func(t *testing.T) {
+ bm := NewRoaringPositionBitmap()
+ bm.Set(tc.deletePos)
+
+ mask := bm.KeepMaskBytes(tc.length)
+ require.Len(t, mask, int((tc.length+7)/8))
+ assert.False(t, bitAt(mask, int64(tc.deletePos)),
+ "position %d (length-1) must be deleted",
tc.deletePos)
+ for _, kept := range tc.expectBits {
+ assert.True(t, bitAt(mask, kept),
+ "position %d must be kept", kept)
+ }
+ })
+ }
+}
+
func TestRoaringBitmapSetContainsAndCardinality(t *testing.T) {
bm := NewRoaringPositionBitmap()
diff --git a/table/dv_scanner_read_test.go b/table/dv_scanner_read_test.go
new file mode 100644
index 00000000..1d038cd2
--- /dev/null
+++ b/table/dv_scanner_read_test.go
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "bytes"
+ "context"
+ "encoding/binary"
+ "hash/crc32"
+ "os"
+ "path/filepath"
+ "strconv"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/puffin"
+ "github.com/apache/iceberg-go/table/dv"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// writeDVPuffinFixture writes a puffin file containing one deletion-vector-v1
+// blob for the given positions and returns the on-disk path plus the blob's
+// offset/length within the file (what a v3 manifest entry stores as
+// content_offset / content_size_in_bytes).
+//
+// The inner payload is wrapped in the spec-canonical [4B BE length][4B LE
+// magic 0xD1D33964][bitmap][4B BE CRC32] envelope so dv.DeserializeDV
+// accepts it. A Go-built payload suffices here; the cross-impl byte pin
+// against a Java-produced fixture is the job of #1041, not this test.
+//
+// TODO(#1041): replace the hand-built envelope with a dv.SerializeDV helper
+// once that PR exports one. Today the dv package only exports the read side.
+func writeDVPuffinFixture(t *testing.T, positions []uint64, referencedDataFile
string) (path string, offset, length int64) {
+ t.Helper()
+
+ bitmap := dv.NewRoaringPositionBitmap()
+ for _, p := range positions {
+ bitmap.Set(p)
+ }
+
+ var bitmapBuf bytes.Buffer
+ require.NoError(t, bitmap.Serialize(&bitmapBuf))
+
+ // Length covers magic + bitmap, excludes CRC.
+ magicAndBitmap := make([]byte, 4+bitmapBuf.Len())
+ binary.LittleEndian.PutUint32(magicAndBitmap[:4], dv.DVMagicNumber)
+ copy(magicAndBitmap[4:], bitmapBuf.Bytes())
+
+ payload := make([]byte, 4+len(magicAndBitmap)+4)
+ binary.BigEndian.PutUint32(payload[:4], uint32(len(magicAndBitmap)))
+ copy(payload[4:4+len(magicAndBitmap)], magicAndBitmap)
+ binary.BigEndian.PutUint32(payload[4+len(magicAndBitmap):],
crc32.ChecksumIEEE(magicAndBitmap))
+
+ var puffinBuf bytes.Buffer
+ w, err := puffin.NewWriter(&puffinBuf)
+ require.NoError(t, err)
+ blobMeta, err := w.AddBlob(puffin.BlobMetadataInput{
+ Type: puffin.BlobTypeDeletionVector,
+ SnapshotID: -1,
+ SequenceNumber: -1,
+ Fields: []int32{},
+ Properties: map[string]string{
+ "referenced-data-file": referencedDataFile,
+ "cardinality":
strconv.FormatInt(bitmap.Cardinality(), 10),
+ },
+ }, payload)
+ require.NoError(t, err)
+ require.NoError(t, w.Finish())
+
+ path = filepath.Join(t.TempDir(), "dv.puffin")
+ require.NoError(t, os.WriteFile(path, puffinBuf.Bytes(), 0o644))
+
+ return path, blobMeta.Offset, blobMeta.Length
+}
+
+// newDVMockDataFile builds a manifest-entry-shaped mock for a DV puffin blob.
+// FileSizeBytes is intentionally left at zero (mockDataFile's default): the
+// scanner read path consults ContentOffset / ContentSizeInBytes only, and
+// setting filesize to the blob length (rather than the puffin file size)
+// invites a misleading test fixture.
+func newDVMockDataFile(puffinPath, referencedDataFile string, offset,
contentSize int64) *dvMockDataFile {
+ return &dvMockDataFile{
+ mockDataFile: mockDataFile{
+ path: puffinPath,
+ contentType: iceberg.EntryContentPosDeletes,
+ format: iceberg.PuffinFile,
+ },
+ referencedDataFile: strPtr(referencedDataFile),
+ contentOffset: int64Ptr(offset),
+ contentSizeInBytes: int64Ptr(contentSize),
+ }
+}
+
+// Compile-time assertion of readAllDeletionVectors' signature — pins the
+// `perFileDVBitmaps` return type so GetRecords' downstream wiring through
+// recordBatchesFromTasksAndDeletes can't silently shift to a different shape
+// without breaking the build first. Placed at package scope so the contract
+// is visible at the top of the file.
+var _ func(context.Context, iceio.IO, []FileScanTask, int) (perFileDVBitmaps,
error) = readAllDeletionVectors
+
+// TestReadAllDeletionVectors verifies the scanner-side DV loader produces a
+// perFileDVBitmaps map keyed by referenced data-file path, with the spec-
+// correctness invariants the loader is the right place to enforce: dedup by
+// referenced data file (not by puffin path, which would silently drop blobs
+// from multi-DV puffin files), rejection of missing referenced_data_file /
+// content_offset, and rejection of two distinct DV blobs targeting the same
+// data file (over-deletion guard, matching Java's DeleteFileIndex
+// ValidationException behaviour for that case).
+//
+// End-to-end coverage (DV positions actually filtered out of GetRecords
+// output for a real Parquet data file) is deferred to a follow-up — would
+// need v3 table-metadata scaffolding that lives outside this PR's scope.
+func TestReadAllDeletionVectors(t *testing.T) {
+ ctx := context.Background()
+ fs := iceio.LocalFS{}
+
+ t.Run("decodes positions and keys by referenced data file", func(t
*testing.T) {
+ const dataFilePath = "file:///table/data/data-001.parquet"
+ puffinPath, offset, length := writeDVPuffinFixture(t,
[]uint64{1, 3, 5, 7, 9}, dataFilePath)
+
+ tasks := []FileScanTask{{DeletionVectorFiles:
[]iceberg.DataFile{
+ newDVMockDataFile(puffinPath, dataFilePath, offset,
length),
+ }}}
+
+ got, err := readAllDeletionVectors(ctx, fs, tasks, 1)
+ require.NoError(t, err)
+ require.Contains(t, got, dataFilePath,
+ "map must be keyed by the referenced data file, not by
the DV puffin path")
+
+ // Bitmap should expose the spec-mandated positions via Contains
+ // (the lookup path filterByDeletionVector will use per batch).
+ bitmap := got[dataFilePath]
+ require.NotNil(t, bitmap)
+ assert.Equal(t, int64(5), bitmap.Cardinality())
+ for _, pos := range []uint64{1, 3, 5, 7, 9} {
+ assert.True(t, bitmap.Contains(pos), "expected %d to be
in bitmap", pos)
+ }
+ // Spot-check a non-deleted position too.
+ assert.False(t, bitmap.Contains(0))
+ })
+
+ t.Run("dedups identical DV referenced by two tasks", func(t *testing.T)
{
+ const dataFilePath = "file:///table/data/data-002.parquet"
+ puffinPath, offset, length := writeDVPuffinFixture(t,
[]uint64{2, 4}, dataFilePath)
+ dvFile := newDVMockDataFile(puffinPath, dataFilePath, offset,
length)
+
+ tasks := []FileScanTask{
+ {DeletionVectorFiles: []iceberg.DataFile{dvFile}},
+ {DeletionVectorFiles: []iceberg.DataFile{dvFile}},
+ }
+
+ got, err := readAllDeletionVectors(ctx, fs, tasks, 2)
+ require.NoError(t, err)
+ // Same (puffin path, content offset) across both tasks is the
+ // same blob — the second occurrence is a dedup no-op, so the
+ // single map entry stays a single bitmap (not a list of two).
+ require.NotNil(t, got[dataFilePath])
+ assert.Equal(t, int64(2), got[dataFilePath].Cardinality())
+ })
+
+ t.Run("no tasks -> empty map, no error", func(t *testing.T) {
+ got, err := readAllDeletionVectors(ctx, fs, nil, 1)
+ require.NoError(t, err)
+ assert.Empty(t, got)
+ })
+
+ t.Run("DV missing referenced_data_file is rejected", func(t *testing.T)
{
+ puffinPath, offset, length := writeDVPuffinFixture(t,
[]uint64{0}, "file:///placeholder.parquet")
+
+ dvFile := &dvMockDataFile{
+ mockDataFile: mockDataFile{
+ path: puffinPath,
+ contentType: iceberg.EntryContentPosDeletes,
+ format: iceberg.PuffinFile,
+ },
+ referencedDataFile: nil, // spec violation
+ contentOffset: int64Ptr(offset),
+ contentSizeInBytes: int64Ptr(length),
+ }
+
+ _, err := readAllDeletionVectors(ctx, fs,
[]FileScanTask{{DeletionVectorFiles: []iceberg.DataFile{dvFile}}}, 1)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "missing referenced_data_file")
+ })
+
+ t.Run("DV missing content_offset is rejected", func(t *testing.T) {
+ // content_offset is required by spec; without it ReadDV can't
+ // locate the blob. Pinned at the pre-pass so two such entries
+ // for the same data file don't silently collide as a misleading
+ // "multiple deletion vectors" dedup error.
+ dvFile := &dvMockDataFile{
+ mockDataFile: mockDataFile{
+ path: "/irrelevant.puffin",
+ contentType: iceberg.EntryContentPosDeletes,
+ format: iceberg.PuffinFile,
+ },
+ referencedDataFile:
strPtr("file:///table/data/data.parquet"),
+ contentOffset: nil, // spec violation
+ contentSizeInBytes: int64Ptr(42),
+ }
+
+ _, err := readAllDeletionVectors(ctx, fs,
[]FileScanTask{{DeletionVectorFiles: []iceberg.DataFile{dvFile}}}, 1)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "missing content_offset")
+ })
+
+ t.Run("multiple distinct DV blobs for same data file are rejected",
func(t *testing.T) {
+ // Two separate puffin files both claiming to be the DV for the
same
+ // data file. Java surfaces this as a ValidationException;
silently
+ // unioning the bitmaps would over-delete. The loader is also
the
+ // right place to catch the case where two blobs share a puffin
file
+ // but live at different content offsets — sameDVBlob compares
both.
+ const dataFilePath = "file:///table/data/data-003.parquet"
+ puffinA, offsetA, lengthA := writeDVPuffinFixture(t,
[]uint64{1, 2}, dataFilePath)
+ puffinB, offsetB, lengthB := writeDVPuffinFixture(t,
[]uint64{3, 4}, dataFilePath)
+
+ tasks := []FileScanTask{{DeletionVectorFiles:
[]iceberg.DataFile{
+ newDVMockDataFile(puffinA, dataFilePath, offsetA,
lengthA),
+ newDVMockDataFile(puffinB, dataFilePath, offsetB,
lengthB),
+ }}}
+
+ _, err := readAllDeletionVectors(ctx, fs, tasks, 1)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "multiple deletion vectors for
data file")
+ })
+
+ t.Run("nil referenced_data_file is rejected before any goroutine
launches", func(t *testing.T) {
+ // The prior implementation validated inside the
goroutine-dispatch
+ // loop, so a bad entry hit AFTER one or more g.Go() calls would
+ // defer-close the result channel while in-flight workers were
still
+ // sending to it (panic: send on closed channel). The v2
structure
+ // makes the original buggy state unreachable by construction —
+ // uniqueDVs is fully built and validated before any goroutine
fans
+ // out. This test pins the invariant: with both a valid and a
+ // nil-ref entry in the input, the call returns a clean error
and
+ // does not panic, regardless of how concurrency is configured.
+ const dataFilePath = "file:///table/data/data-004.parquet"
+ puffinPath, offset, length := writeDVPuffinFixture(t,
[]uint64{42}, dataFilePath)
+ valid := newDVMockDataFile(puffinPath, dataFilePath, offset,
length)
+ broken := &dvMockDataFile{
+ mockDataFile: mockDataFile{
+ path: "/nonexistent.puffin",
+ contentType: iceberg.EntryContentPosDeletes,
+ format: iceberg.PuffinFile,
+ },
+ referencedDataFile: nil,
+ contentOffset: int64Ptr(0),
+ contentSizeInBytes: int64Ptr(0),
+ }
+
+ var err error
+ assert.NotPanics(t, func() {
+ _, err = readAllDeletionVectors(ctx, fs, []FileScanTask{
+ {DeletionVectorFiles:
[]iceberg.DataFile{valid}},
+ {DeletionVectorFiles:
[]iceberg.DataFile{broken}},
+ }, 4)
+ })
+ require.Error(t, err)
+ })
+}
+
+// TestFilterByDeletionVector pins the per-batch pipeline step that applies
+// a RoaringPositionBitmap to a record batch via Boolean keep-mask +
+// compute.Filter — the load-bearing optimization over materializing the
+// bitmap into a position set and using compute.Take.
+//
+// Spans two batches so the closure-captured absolute-position counter is
+// exercised: positions 1,3 deleted from batch 1 (rows 0-4), positions 7,9
+// from batch 2 (rows 5-9). The keep-mask logic is "false = drop, true =
+// keep", so the surviving rows must be 0,2,4,6,8 across both batches.
+func TestFilterByDeletionVector(t *testing.T) {
+ ctx := context.Background()
+ mem := memory.NewGoAllocator()
+
+ bitmap := dv.NewRoaringPositionBitmap()
+ for _, p := range []uint64{1, 3, 7, 9} {
+ bitmap.Set(p)
+ }
+ filter := filterByDeletionVector(ctx, bitmap, 10)
+
+ mkBatch := func(start, end int64) arrow.RecordBatch {
+ bldr := array.NewInt64Builder(mem)
+ defer bldr.Release()
+ for i := start; i < end; i++ {
+ bldr.Append(i)
+ }
+ col := bldr.NewArray()
+ defer col.Release()
+ schema := arrow.NewSchema([]arrow.Field{{Name: "pos", Type:
arrow.PrimitiveTypes.Int64}}, nil)
+
+ return array.NewRecordBatch(schema, []arrow.Array{col},
end-start)
+ }
+
+ t.Run("drops deleted rows across multiple batches", func(t *testing.T) {
+ batch1 := mkBatch(0, 5)
+ out1, err := filter(batch1)
+ require.NoError(t, err)
+ defer out1.Release()
+ assert.Equal(t, []int64{0, 2, 4},
+ out1.Column(0).(*array.Int64).Int64Values())
+
+ batch2 := mkBatch(5, 10)
+ out2, err := filter(batch2)
+ require.NoError(t, err)
+ defer out2.Release()
+ assert.Equal(t, []int64{5, 6, 8},
+ out2.Column(0).(*array.Int64).Int64Values())
+ })
+}
diff --git a/table/scanner.go b/table/scanner.go
index 8e94088f..93261a53 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -681,9 +681,19 @@ func (scan *Scan) PlanFiles(ctx context.Context)
([]FileScanTask, error) {
results := make([]FileScanTask, 0, len(entries.dataEntries))
for _, e := range entries.dataEntries {
- deleteFiles, err := matchDeletesToData(e,
entries.positionalDeleteEntries)
- if err != nil {
- return nil, err
+ // Spec §Scan Planning: when a deletion vector applies to a data
+ // file, positional-delete files must NOT be applied. The DV is
+ // guaranteed to encode all prior pos-delete positions; reading
the
+ // pos-delete Parquet too would be wasteful I/O, and on a buggy
+ // writer whose DV omits prior positions, applying both would
+ // over-delete. Mirrors Java's DeleteFileIndex.forDataFile.
+ dvFiles := matchDVToData(e, dvIndex)
+ var deleteFiles []iceberg.DataFile
+ if len(dvFiles) == 0 {
+ deleteFiles, err = matchDeletesToData(e,
entries.positionalDeleteEntries)
+ if err != nil {
+ return nil, err
+ }
}
eqDeleteFiles := matchEqualityDeletesToData(e,
entries.equalityDeleteEntries)
@@ -691,7 +701,7 @@ func (scan *Scan) PlanFiles(ctx context.Context)
([]FileScanTask, error) {
File: e.DataFile(),
DeleteFiles: deleteFiles,
EqualityDeleteFiles: eqDeleteFiles,
- DeletionVectorFiles: matchDVToData(e, dvIndex),
+ DeletionVectorFiles: dvFiles,
Start: 0,
Length: e.DataFile().FileSizeBytes(),
}