This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 87c1958f feat(table): scanner deletion-vector read path (#1053)
87c1958f is described below

commit 87c1958ffac77956e00cfdf868dc7916dc51aed9
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Tue May 26 18:47:26 2026 +0200

    feat(table): scanner deletion-vector read path (#1053)
    
    Closes #996. Drops the ErrNotImplemented guard in GetRecords and wires
    DVs into the same per-file delete map Parquet pos-deletes already feed —
    recordsFromTask sees one set[int64] either way. Dedup is keyed by
    referenced data file (one puffin file can carry multiple blobs);
    distinct DVs for the same data file are rejected. PlanFiles also stops
    pairing pos-delete files with a data file when a DV applies, per spec.
    
    Tests build the puffin in-process so this isn't blocked on #1041.
    
    Adjacent gaps deferred to follow-ups:
    - #1048 (puffin DV header check)
    - #1049 (ReadDV cardinality)
    - #1050 (PlanFiles DV seq + multi-DV reject)
    - #1051 (GetRecords arrow leak)
    - #1052 (end-to-end DV scan test).
---
 table/arrow_scanner.go          | 233 ++++++++++++++++++++++++----
 table/dv/roaring_bitmap.go      |  85 ++++++++++-
 table/dv/roaring_bitmap_test.go | 159 ++++++++++++++++++-
 table/dv_scanner_read_test.go   | 329 ++++++++++++++++++++++++++++++++++++++++
 table/scanner.go                |  18 ++-
 5 files changed, 784 insertions(+), 40 deletions(-)

diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
index 259d5ecd..968201d9 100644
--- a/table/arrow_scanner.go
+++ b/table/arrow_scanner.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/iceberg-go"
        iceinternal "github.com/apache/iceberg-go/internal"
        iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table/dv"
        "github.com/apache/iceberg-go/table/internal"
        "github.com/apache/iceberg-go/table/substrait"
        "github.com/substrait-io/substrait-go/v8/expr"
@@ -70,11 +71,8 @@ func releasePerFilePosDeletes(deletesPerFile 
perFilePosDeletes) {
 }
 
 func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks 
[]FileScanTask, concurrency int) (perFilePosDeletes, error) {
-       var (
-               deletesPerFile = make(perFilePosDeletes)
-               uniqueDeletes  = make(map[string]iceberg.DataFile)
-               err            error
-       )
+       deletesPerFile := make(perFilePosDeletes)
+       uniqueDeletes := make(map[string]iceberg.DataFile)
 
        for _, t := range tasks {
                for _, d := range t.DeleteFiles {
@@ -92,24 +90,33 @@ func readAllDeleteFiles(ctx context.Context, fs iceio.IO, 
tasks []FileScanTask,
                return deletesPerFile, nil
        }
 
-       g, ctx := errgroup.WithContext(ctx)
+       g, gctx := errgroup.WithContext(ctx)
        g.SetLimit(concurrency)
 
        perFileChan := make(chan map[string]*arrow.Chunked, concurrency)
        go func() {
+               // Inner g.Wait() gates the deferred channel close so a late 
worker
+               // can't send on a closed channel. Outer g.Wait() below 
collects the
+               // error — no cross-goroutine shared err variable.
                defer close(perFileChan)
                for _, v := range uniqueDeletes {
                        g.Go(func() error {
-                               deletes, err := readDeletes(ctx, fs, v)
-                               if deletes != nil {
-                                       perFileChan <- deletes
+                               deletes, err := readDeletes(gctx, fs, v)
+                               if err != nil {
+                                       return err
+                               }
+                               if deletes == nil {
+                                       return nil
+                               }
+                               select {
+                               case perFileChan <- deletes:
+                                       return nil
+                               case <-gctx.Done():
+                                       return gctx.Err()
                                }
-
-                               return err
                        })
                }
-
-               err = g.Wait()
+               _ = g.Wait()
        }()
 
        for deletes := range perFileChan {
@@ -118,7 +125,130 @@ func readAllDeleteFiles(ctx context.Context, fs iceio.IO, 
tasks []FileScanTask,
                }
        }
 
-       return deletesPerFile, err
+       if err := g.Wait(); err != nil {
+               return nil, err
+       }
+
+       return deletesPerFile, nil
+}
+
+// perFileDVBitmaps maps each data-file path to the deletion-vector bitmap
+// that applies to it. Kept separate from perFilePosDeletes so the row-filter
+// pipeline can use compute.Filter on a Boolean mask built from Contains()
+// directly, instead of materializing positions into a set[int64] + Take.
+type perFileDVBitmaps = map[string]*dv.RoaringPositionBitmap
+
+// readAllDeletionVectors reads every deletion-vector puffin blob referenced
+// by the input tasks and returns a perFileDVBitmaps map keyed by the
+// referenced data-file path.
+//
+// Dedup is by referenced-data-file path, not by puffin file path: a single
+// puffin file can carry multiple DV blobs (one per data file). Keying by the
+// puffin path would silently drop all but the first blob. This matches Java's
+// DeleteFileIndex.findDV, which keys by data-file path. As a side-effect we
+// can detect spec violations: two distinct DV blobs targeting the same data
+// file is rejected (mirrors Java's "Can't index multiple DVs for %s"
+// ValidationException — over-deletion risk if silently unioned).
+//
+// Validation happens up front, before any goroutines are launched, so the
+// goroutine fan-out has no early-exit path. (An early return after g.Go
+// dispatches but before g.Wait would close resultsChan while in-flight
+// workers were still sending, panicking with "send on closed channel".)
+func readAllDeletionVectors(ctx context.Context, fs iceio.IO, tasks 
[]FileScanTask, concurrency int) (perFileDVBitmaps, error) {
+       out := make(perFileDVBitmaps)
+       uniqueDVs := make(map[string]iceberg.DataFile)
+
+       for _, t := range tasks {
+               for _, d := range t.DeletionVectorFiles {
+                       ref := d.ReferencedDataFile()
+                       if ref == nil {
+                               return nil, fmt.Errorf("deletion vector %s 
missing referenced_data_file", d.FilePath())
+                       }
+                       if d.ContentOffset() == nil || d.ContentSizeInBytes() 
== nil {
+                               // Spec §Manifest Files: content_offset and 
content_size_in_
+                               // bytes are required for DV entries. Surface 
the missing-
+                               // field cause directly here — otherwise the 
dedup check
+                               // below would produce the misleading "multiple 
deletion
+                               // vectors" error when two equally-broken 
entries collide.
+                               return nil, fmt.Errorf("deletion vector %s 
missing content_offset/content_size_in_bytes", d.FilePath())
+                       }
+                       if existing, seen := uniqueDVs[*ref]; seen {
+                               if !sameDVBlob(existing, d) {
+                                       return nil, fmt.Errorf(
+                                               "multiple deletion vectors for 
data file %s: %s and %s",
+                                               *ref, existing.FilePath(), 
d.FilePath())
+                               }
+
+                               continue
+                       }
+                       uniqueDVs[*ref] = d
+               }
+       }
+
+       if len(uniqueDVs) == 0 {
+               return out, nil
+       }
+
+       type dvResult struct {
+               referencedDataFile string
+               bitmap             *dv.RoaringPositionBitmap
+       }
+
+       g, gctx := errgroup.WithContext(ctx)
+       g.SetLimit(concurrency)
+
+       resultsChan := make(chan dvResult, concurrency)
+       go func() {
+               // g.Wait() before the deferred close so workers finish sending
+               // before the channel is closed — otherwise a late worker would
+               // panic on send to a closed channel. The error is collected via
+               // the outer g.Wait() below, not stored on a shared variable.
+               defer close(resultsChan)
+               for ref, dvFile := range uniqueDVs {
+                       g.Go(func() error {
+                               bitmap, err := dv.ReadDV(fs, dvFile)
+                               if err != nil {
+                                       return fmt.Errorf("read deletion vector 
%s: %w", dvFile.FilePath(), err)
+                               }
+                               select {
+                               case resultsChan <- 
dvResult{referencedDataFile: ref, bitmap: bitmap}:
+                                       return nil
+                               case <-gctx.Done():
+                                       return gctx.Err()
+                               }
+                       })
+               }
+               _ = g.Wait()
+       }()
+
+       for r := range resultsChan {
+               out[r.referencedDataFile] = r.bitmap
+       }
+
+       if err := g.Wait(); err != nil {
+               return nil, err
+       }
+
+       return out, nil
+}
+
+// sameDVBlob reports whether two DV manifest entries point at the same puffin
+// blob — identical puffin file path and content offset. Different of either
+// means two distinct DVs for the same data file, the over-deletion case
+// readAllDeletionVectors rejects.
+//
+// Java's DeleteFileIndex is stricter: any second DV for the same data file is
+// rejected with ValidationException("Can't index multiple DVs for %s"), even
+// when both entries reference the same underlying blob. Same-blob dedup here
+// is a deliberate divergence — reading the same blob twice is wasteful, not
+// incorrect. ContentOffset is required to be non-nil by the spec and by the
+// pre-pass in readAllDeletionVectors, so the comparison below assumes both.
+func sameDVBlob(a, b iceberg.DataFile) bool {
+       if a.FilePath() != b.FilePath() {
+               return false
+       }
+
+       return *a.ContentOffset() == *b.ContentOffset()
 }
 
 func readDeletes(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile) 
(_ map[string]*arrow.Chunked, err error) {
@@ -211,6 +341,43 @@ func processPositionalDeletes(ctx context.Context, deletes 
set[int64]) recProces
        }
 }
 
+// filterByDeletionVector returns a pipeline step that drops rows present in
+// the bitmap by precomputing a bit-packed keep-mask covering the whole file
+// once and slicing the relevant range per batch into 
compute.FilterRecordBatch.
+//
+// The mask layout matches Arrow's Boolean buffer convention (LSB-first per
+// byte, little-endian word order), so dv.KeepMaskBytes -> 
memory.NewBufferBytes
+// -> array.NewBoolean is a zero-copy wrap. The keepBits slice is Go-allocated
+// (GC-friendly) and shared across every per-batch Boolean array; each
+// array.NewBoolean / array.NewSlice pair is released after the batch.
+//
+// rowCount bounds the mask to the data file's row count. The closure-captured
+// nextIdx tracks absolute position across batches, mirroring
+// processPositionalDeletes.
+func filterByDeletionVector(ctx context.Context, bitmap 
*dv.RoaringPositionBitmap, rowCount int64) recProcessFn {
+       nextIdx := int64(0)
+       keepBits := bitmap.KeepMaskBytes(rowCount)
+       buf := memory.NewBufferBytes(keepBits)
+
+       return func(r arrow.RecordBatch) (arrow.RecordBatch, error) {
+               defer r.Release()
+
+               currentIdx := nextIdx
+               nextIdx += r.NumRows()
+
+               // Wrap (and slice) the shared keep-mask buffer for this batch.
+               // array.NewSlice on a Boolean array tracks the bit-level 
offset,
+               // so we don't need byte-aligned slicing — currentIdx can land
+               // anywhere within a byte.
+               full := array.NewBoolean(int(rowCount), buf, nil, 0)
+               defer full.Release()
+               sliced := array.NewSlice(full, currentIdx, 
nextIdx).(*array.Boolean)
+               defer sliced.Release()
+
+               return compute.FilterRecordBatch(ctx, r, sliced, 
compute.DefaultFilterOptions())
+       }
+}
+
 // enrichRecordsWithPosDeleteFields enriches a RecordBatch with the columns 
declared in the PositionalDeleteArrowSchema
 // so that during the pipeline filtering stages that sheds filtered out 
records, we still have a way to
 // preserve the original position of those records.
@@ -561,7 +728,7 @@ func (as *arrowScan) processRecords(
        return err
 }
 
-func (as *arrowScan) recordsFromTask(ctx context.Context, task 
internal.Enumerated[FileScanTask], out chan<- enumeratedRecord, 
positionalDeletes positionDeletes, eqDeleteSets []*equalityDeleteSet) (err 
error) {
+func (as *arrowScan) recordsFromTask(ctx context.Context, task 
internal.Enumerated[FileScanTask], out chan<- enumeratedRecord, 
positionalDeletes positionDeletes, dvBitmap *dv.RoaringPositionBitmap, 
eqDeleteSets []*equalityDeleteSet) (err error) {
        defer func() {
                if err != nil {
                        out <- enumeratedRecord{Task: task, Err: err}
@@ -582,7 +749,7 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, 
task internal.Enumerat
        }
        defer iceinternal.CheckedClose(rdr, &err)
 
-       pipeline := make([]recProcessFn, 0, 2)
+       pipeline := make([]recProcessFn, 0, 3)
        if len(positionalDeletes) > 0 {
                deletes := set[int64]{}
                for _, chunk := range positionalDeletes {
@@ -596,6 +763,14 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, 
task internal.Enumerat
                pipeline = append(pipeline, processPositionalDeletes(ctx, 
deletes))
        }
 
+       // PlanFiles skips Parquet pos-delete matching for any data file that
+       // has a DV (per spec), so in practice the two are mutually exclusive
+       // per task. Append after the pos-delete step anyway so a manually-
+       // constructed task with both sources still gets both filters applied.
+       if dvBitmap != nil && !dvBitmap.IsEmpty() {
+               pipeline = append(pipeline, filterByDeletionVector(ctx, 
dvBitmap, task.Value.File.Count()))
+       }
+
        if len(eqDeleteSets) > 0 {
                eqFn, eqErr := processEqualityDeletes(ctx, eqDeleteSets)
                if eqErr != nil {
@@ -822,7 +997,7 @@ func createIterator(ctx context.Context, numWorkers uint, 
records <-chan enumera
        }
 }
 
-func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context, 
tasks []FileScanTask, deletesPerFile perFilePosDeletes, eqDeleteSets 
map[int][]*equalityDeleteSet) iter.Seq2[arrow.RecordBatch, error] {
+func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context, 
tasks []FileScanTask, deletesPerFile perFilePosDeletes, dvBitmaps 
perFileDVBitmaps, eqDeleteSets map[int][]*equalityDeleteSet) 
iter.Seq2[arrow.RecordBatch, error] {
        extSet := substrait.NewExtensionSet()
        as.nameMapping = as.metadata.NameMapping()
 
@@ -847,8 +1022,10 @@ func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx 
context.Context, tasks
                                                return
                                        }
 
+                                       filePath := task.Value.File.FilePath()
                                        if err := as.recordsFromTask(ctx, task, 
records,
-                                               
deletesPerFile[task.Value.File.FilePath()],
+                                               deletesPerFile[filePath],
+                                               dvBitmaps[filePath],
                                                eqDeleteSets[task.Index]); err 
!= nil {
                                                cancel(err)
 
@@ -876,14 +1053,6 @@ func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx 
context.Context, tasks
 }
 
 func (as *arrowScan) GetRecords(ctx context.Context, tasks []FileScanTask) 
(*arrow.Schema, iter.Seq2[arrow.RecordBatch, error], error) {
-       for _, t := range tasks {
-               if len(t.DeletionVectorFiles) > 0 {
-                       return nil, nil, fmt.Errorf(
-                               "%w: deletion vector read is not yet 
implemented, data file: %s has %d deletion vector(s)",
-                               iceberg.ErrNotImplemented, t.File.FilePath(), 
len(t.DeletionVectorFiles))
-               }
-       }
-
        var err error
        as.useLargeTypes, err = 
strconv.ParseBool(as.options.Get(ScanOptionArrowUseLargeTypes, "false"))
        if err != nil {
@@ -910,6 +1079,16 @@ func (as *arrowScan) GetRecords(ctx context.Context, 
tasks []FileScanTask) (*arr
                return nil, nil, err
        }
 
+       // DV bitmaps stay in their native form rather than being materialized
+       // into int64 positions and merged with the Parquet pos-delete map.
+       // filterByDeletionVector applies the bitmap to each batch via a Boolean
+       // keep-mask + compute.Filter — O(1) Contains lookups, vectorized 
Filter,
+       // no intermediate position set.
+       dvBitmaps, err := readAllDeletionVectors(ctx, as.fs, tasks, 
as.concurrency)
+       if err != nil {
+               return nil, nil, err
+       }
+
        eqDeleteSets, err := readAllEqualityDeleteFiles(ctx, as.fs,
                as.metadata.CurrentSchema(), tasks, as.concurrency)
        if err != nil {
@@ -919,5 +1098,5 @@ func (as *arrowScan) GetRecords(ctx context.Context, tasks 
[]FileScanTask) (*arr
                return nil, nil, err
        }
 
-       return resultSchema, as.recordBatchesFromTasksAndDeletes(ctx, tasks, 
deletesPerFile, eqDeleteSets), nil
+       return resultSchema, as.recordBatchesFromTasksAndDeletes(ctx, tasks, 
deletesPerFile, dvBitmaps, eqDeleteSets), nil
 }
diff --git a/table/dv/roaring_bitmap.go b/table/dv/roaring_bitmap.go
index cb25844a..a51ddc51 100644
--- a/table/dv/roaring_bitmap.go
+++ b/table/dv/roaring_bitmap.go
@@ -22,11 +22,11 @@ import (
        "encoding/binary"
        "fmt"
        "io"
-       "maps"
-       "slices"
        "sort"
 
        "github.com/RoaringBitmap/roaring/v2"
+       "github.com/apache/arrow-go/v18/arrow/bitutil"
+       "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/apache/iceberg-go/puffin"
 )
 
@@ -76,7 +76,11 @@ func (b *RoaringPositionBitmap) Contains(pos uint64) bool {
        return bm.Contains(low)
 }
 
-// IsEmpty returns true if no positions are set.
+// IsEmpty returns true if no positions are set. Returns true both for the
+// no-bucket case and for the (currently impossible-via-public-API) case
+// where a bucket exists but its inner roaring bitmap has zero cardinality
+// — the latter only matters if a future Remove-style method ever lets a
+// bucket drop to empty without being deleted from the map.
 func (b *RoaringPositionBitmap) IsEmpty() bool {
        return b.Cardinality() == 0
 }
@@ -160,7 +164,76 @@ func DeserializeRoaringPositionBitmap(data []byte) 
(*RoaringPositionBitmap, erro
        return b, nil
 }
 
-// sortedKeys returns the bitmap keys in ascending order.
-func (b *RoaringPositionBitmap) sortedKeys() []uint32 {
-       return slices.Sorted(maps.Keys(b.bitmaps))
+// KeepMaskBytes returns a bit-packed []byte of length ⌈length/8⌉ where bit i
+// (LSB-first within a byte) is 1 iff position i is NOT in the bitmap. The
+// layout matches Arrow Boolean buffer convention so callers can wrap the
+// result via memory.NewBufferBytes / array.NewBoolean without re-packing.
+//
+// length bounds the range of positions the caller cares about — typically the
+// data file's row count. Bits past length-1 in the final byte are cleared.
+// Positions in the bitmap that fall outside [0, length) are ignored, so a
+// caller can safely pass a length smaller than the bitmap's max position
+// (e.g. when the file row count is below a stale upper bound).
+//
+// Bucket-key arithmetic is exact: each 32-bit bucket covers exactly 2^32
+// positions, so per-bucket bit offsets are 8-byte-aligned and the writer can
+// pack inverted dense words straight in. bitutil.BitmapWordWriter handles
+// host-endianness internally (PutNextWord LE-packs regardless of platform),
+// so the helper is portable on any GOARCH.
+func (b *RoaringPositionBitmap) KeepMaskBytes(length int64) []byte {
+       if length <= 0 {
+               return nil
+       }
+       out := make([]byte, bitutil.BytesForBits(length))
+       // Pre-fill ALL bits to 1 (keep) before any bucket write. 
BitmapWordWriter
+       // does full-word stores (not read-modify-write), so the un-deleted bits
+       // in each 8-byte word land at 1 only because this initialization
+       // happened. A future refactor that moves bucket writes before the
+       // memory.Set, or removes it entirely, would silently corrupt the mask.
+       memory.Set(out, 0xFF)
+
+       for key, bm := range b.bitmaps {
+               bucketBitBase := int64(key) << 32
+               if bucketBitBase >= length {
+                       continue
+               }
+               dense := bm.ToDense()
+               if len(dense) == 0 {
+                       continue
+               }
+               // Cap the bucket's bit range to what fits in `length`.
+               bucketBits := int64(len(dense)) * 64
+               if bucketBitBase+bucketBits > length {
+                       bucketBits = length - bucketBitBase
+               }
+               // bucketBitBase = key << 32 is always 8-byte-aligned, so the
+               // BitmapWordWriter runs with offset=0 internally. The 
trailing-byte
+               // loop below relies on that alignment — PutNextTrailingByte's
+               // validBits=8 path is byte-aligned only when offset == 0.
+               wr := bitutil.NewBitmapWordWriter(out, int(bucketBitBase), 
int(bucketBits))
+               full := int(bucketBits / 64)
+               for i := 0; i < full; i++ {
+                       wr.PutNextWord(^dense[i])
+               }
+               if rem := int(bucketBits & 63); rem > 0 {
+                       last := ^dense[full]
+                       for rem > 0 {
+                               valid := 8
+                               if rem < 8 {
+                                       valid = rem
+                               }
+                               wr.PutNextTrailingByte(byte(last), valid)
+                               last >>= 8
+                               rem -= valid
+                       }
+               }
+       }
+       // Pad bits past length-1 in the trailing byte must be 0; the writer can
+       // paint into them when bucketBits stops mid-byte, so the clear has to
+       // happen after all bucket writes complete.
+       if pad := int64(len(out))*8 - length; pad > 0 {
+               bitutil.SetBitsTo(out, length, pad, false)
+       }
+
+       return out
 }
diff --git a/table/dv/roaring_bitmap_test.go b/table/dv/roaring_bitmap_test.go
index f060b7d4..bccc7e9c 100644
--- a/table/dv/roaring_bitmap_test.go
+++ b/table/dv/roaring_bitmap_test.go
@@ -22,6 +22,7 @@ import (
        "encoding/binary"
        "testing"
 
+       "github.com/RoaringBitmap/roaring/v2"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 )
@@ -147,9 +148,161 @@ func TestDeserializeRoaringBitmapTruncatedAfterKey(t 
*testing.T) {
        assert.ErrorContains(t, err, "read bitmap for key 0")
 }
 
-// Why: Set, Contains, Cardinality, and gap handling are the core in-memory 
behaviors of this type.
-// Condition: set positions across keys 0, 1, and 3, leaving key 2 absent.
-// Assertion: cardinality counts all set positions, expected positions are 
present, and unset positions in the same key, a gap key, and a far key are 
absent.
+// TestKeepMaskBytes pins KeepMaskBytes' contract: a bit-packed []byte where
+// bit i is 1 iff position i is NOT in the bitmap, sized to ⌈length/8⌉ bytes,
+// matching Arrow Boolean buffer layout so it can be wrapped without 
re-packing.
+func TestKeepMaskBytes(t *testing.T) {
+       // bitAt returns the LSB-first bit at position i in mask.
+       bitAt := func(mask []byte, i int64) bool {
+               return mask[i>>3]&(1<<uint(i&7)) != 0
+       }
+
+       t.Run("empty bitmap: all bits kept", func(t *testing.T) {
+               bm := NewRoaringPositionBitmap()
+               mask := bm.KeepMaskBytes(10)
+               require.Len(t, mask, 2) // ⌈10/8⌉ = 2
+               for i := int64(0); i < 10; i++ {
+                       assert.True(t, bitAt(mask, i), "bit %d should be kept", 
i)
+               }
+               // Bits 10..15 in the trailing byte must be cleared so callers
+               // inspecting raw bytes don't see phantom keep bits past length.
+               assert.Equal(t, byte(0b00000011), mask[1])
+       })
+
+       t.Run("single bucket: deleted bits cleared, others kept", func(t 
*testing.T) {
+               bm := NewRoaringPositionBitmap()
+               bm.Set(0)
+               bm.Set(3)
+               bm.Set(7)
+               bm.Set(9)
+
+               mask := bm.KeepMaskBytes(16)
+               require.Len(t, mask, 2)
+               for _, deleted := range []int64{0, 3, 7, 9} {
+                       assert.False(t, bitAt(mask, deleted), "bit %d should be 
deleted", deleted)
+               }
+               for _, kept := range []int64{1, 2, 4, 5, 6, 8, 10, 11, 12, 13, 
14, 15} {
+                       assert.True(t, bitAt(mask, kept), "bit %d should be 
kept", kept)
+               }
+       })
+
+       t.Run("length zero returns nil", func(t *testing.T) {
+               bm := NewRoaringPositionBitmap()
+               bm.Set(0)
+               assert.Nil(t, bm.KeepMaskBytes(0))
+       })
+
+       t.Run("length shorter than bitmap: extra positions ignored", func(t 
*testing.T) {
+               // Bitmap holds a delete at position 50, but the caller asks 
for only
+               // the first 32 positions. The mask must not extend to position 
50,
+               // and the in-range bits must all be kept.
+               bm := NewRoaringPositionBitmap()
+               bm.Set(50)
+
+               mask := bm.KeepMaskBytes(32)
+               require.Len(t, mask, 4)
+               for i := int64(0); i < 32; i++ {
+                       assert.True(t, bitAt(mask, i), "bit %d should be kept", 
i)
+               }
+       })
+
+       t.Run("length not byte-aligned: trailing bits cleared", func(t 
*testing.T) {
+               // Length 13 → 2 bytes, bits 13..15 in byte 1 must be 0 even 
when no
+               // deletes touch them. Pins the trailing-byte mask logic.
+               bm := NewRoaringPositionBitmap()
+               mask := bm.KeepMaskBytes(13)
+               require.Len(t, mask, 2)
+               assert.Equal(t, byte(0xFF), mask[0])
+               assert.Equal(t, byte(0b00011111), mask[1])
+       })
+
+       t.Run("multi-bucket: deletes from a non-zero bucket are out of range 
and ignored", func(t *testing.T) {
+               // Spec parity case: a DV bitmap with positions in bucket 1
+               // (position >= 2^32) cannot fit in a realistic caller-supplied
+               // length, since allocating ⌈length/8⌉ bytes for length >= 2^32
+               // would require >= 512 MiB. KeepMaskBytes must correctly walk 
every
+               // bucket and skip those whose byte-offset already exceeds the 
mask
+               // length — without this guard the bucket-1 ToDense allocation 
would
+               // run anyway and (worse) try to index past out[len(out)-1].
+               bm := NewRoaringPositionBitmap()
+               bm.Set(5)              // bucket 0
+               bm.Set(1<<32 | 7)      // bucket 1 — out of range for length 64
+               bm.Set(3<<32 | 999999) // bucket 3 — out of range for length 64
+
+               mask := bm.KeepMaskBytes(64)
+               require.Len(t, mask, 8)
+               assert.False(t, bitAt(mask, 5))
+               for i := int64(0); i < 64; i++ {
+                       if i == 5 {
+                               continue
+                       }
+                       assert.True(t, bitAt(mask, i), "bit %d should be kept", 
i)
+               }
+       })
+
+       t.Run("multi-bucket within range: bucket-1 deletes land at correct 
offset", func(t *testing.T) {
+               // Construct the bitmaps map manually with a synthetic small-key
+               // "bucket 1" so the test can verify cross-bucket bit placement
+               // without allocating 512 MiB. The internal layout is exactly 
the
+               // same as Set would produce for a real position >= 2^32; this 
just
+               // dodges the high-position allocation. Verifies that the 
per-bucket
+               // bucketBitBase = key << 32 (bit offset 2^32, i.e. byte offset 
2^29)
+               // places bucket-1 deletes at byte 2^29, not at byte 0.
+               const length int64 = (1 << 32) + 16 // just past the bucket-0/1 
boundary
+               bm := &RoaringPositionBitmap{bitmaps: 
make(map[uint32]*roaring.Bitmap)}
+               bm.Set(5) // bucket 0
+               // Directly install a bucket-1 entry with low-bits 7 set, which 
is
+               // what Set((1<<32)+7) would produce — but avoids the 512 MiB 
mask.
+               bucket1 := roaring.New()
+               bucket1.Add(7)
+               bm.bitmaps[1] = bucket1
+
+               mask := bm.KeepMaskBytes(length)
+               require.Len(t, mask, int((length+7)/8))
+               assert.False(t, bitAt(mask, 5), "bucket-0 delete at position 5 
must land in byte 0")
+               assert.False(t, bitAt(mask, (1<<32)+7), "bucket-1 delete at 
position 2^32+7 must land at byte 2^29")
+               // Adjacent positions in each bucket should be kept.
+               assert.True(t, bitAt(mask, 4))
+               assert.True(t, bitAt(mask, 6))
+               assert.True(t, bitAt(mask, (1<<32)+6))
+               assert.True(t, bitAt(mask, (1<<32)+8))
+       })
+
+       // Pins PutNextTrailingByte's per-byte loop in KeepMaskBytes: when a
+       // bucket's bit range stops mid-word (bucketBits & 63 != 0), the loop
+       // must place delete bits inside the trailing partial word. Without
+       // coverage, an off-by-one in `last >>= 8` or `valid = min(8, rem)`
+       // would silently keep a deleted row visible.
+       for _, tc := range []struct {
+               name       string
+               length     int64
+               deletePos  uint64
+               expectBits []int64 // additional positions that must be kept
+       }{
+               // rem = length % 64. For each rem in {1, 7, 8, 63}, place a
+               // deletion at the LAST in-range bit so the trailing loop must
+               // reach validBits = rem on its final iteration.
+               {name: "rem=1: delete at length-1", length: 65, deletePos: 64, 
expectBits: []int64{0, 32, 63}},
+               {name: "rem=7: delete at length-1", length: 71, deletePos: 70, 
expectBits: []int64{0, 63, 64, 69}},
+               {name: "rem=8: delete at length-1", length: 72, deletePos: 71, 
expectBits: []int64{0, 64, 70}},
+               {name: "rem=63: delete at length-1", length: 127, deletePos: 
126, expectBits: []int64{0, 64, 125}},
+       } {
+               t.Run(tc.name, func(t *testing.T) {
+                       bm := NewRoaringPositionBitmap()
+                       bm.Set(tc.deletePos)
+
+                       mask := bm.KeepMaskBytes(tc.length)
+                       require.Len(t, mask, int((tc.length+7)/8))
+                       assert.False(t, bitAt(mask, int64(tc.deletePos)),
+                               "position %d (length-1) must be deleted", 
tc.deletePos)
+                       for _, kept := range tc.expectBits {
+                               assert.True(t, bitAt(mask, kept),
+                                       "position %d must be kept", kept)
+                       }
+               })
+       }
+}
+
 func TestRoaringBitmapSetContainsAndCardinality(t *testing.T) {
        bm := NewRoaringPositionBitmap()
 
diff --git a/table/dv_scanner_read_test.go b/table/dv_scanner_read_test.go
new file mode 100644
index 00000000..1d038cd2
--- /dev/null
+++ b/table/dv_scanner_read_test.go
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "bytes"
+       "context"
+       "encoding/binary"
+       "hash/crc32"
+       "os"
+       "path/filepath"
+       "strconv"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/puffin"
+       "github.com/apache/iceberg-go/table/dv"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// writeDVPuffinFixture writes a puffin file containing one deletion-vector-v1
+// blob for the given positions and returns the on-disk path plus the blob's
+// offset/length within the file (what a v3 manifest entry stores as
+// content_offset / content_size_in_bytes).
+//
+// The inner payload is wrapped in the spec-canonical [4B BE length][4B LE
+// magic 0xD1D33964][bitmap][4B BE CRC32] envelope so dv.DeserializeDV
+// accepts it. A Go-built payload suffices here; the cross-impl byte pin
+// against a Java-produced fixture is the job of #1041, not this test.
+//
+// TODO(#1041): replace the hand-built envelope with a dv.SerializeDV helper
+// once that PR exports one. Today the dv package only exports the read side.
+func writeDVPuffinFixture(t *testing.T, positions []uint64, referencedDataFile 
string) (path string, offset, length int64) {
+       t.Helper()
+
+       bitmap := dv.NewRoaringPositionBitmap()
+       for _, p := range positions {
+               bitmap.Set(p)
+       }
+
+       var bitmapBuf bytes.Buffer
+       require.NoError(t, bitmap.Serialize(&bitmapBuf))
+
+       // Length covers magic + bitmap, excludes CRC.
+       magicAndBitmap := make([]byte, 4+bitmapBuf.Len())
+       binary.LittleEndian.PutUint32(magicAndBitmap[:4], dv.DVMagicNumber)
+       copy(magicAndBitmap[4:], bitmapBuf.Bytes())
+
+       payload := make([]byte, 4+len(magicAndBitmap)+4)
+       binary.BigEndian.PutUint32(payload[:4], uint32(len(magicAndBitmap)))
+       copy(payload[4:4+len(magicAndBitmap)], magicAndBitmap)
+       binary.BigEndian.PutUint32(payload[4+len(magicAndBitmap):], 
crc32.ChecksumIEEE(magicAndBitmap))
+
+       var puffinBuf bytes.Buffer
+       w, err := puffin.NewWriter(&puffinBuf)
+       require.NoError(t, err)
+       blobMeta, err := w.AddBlob(puffin.BlobMetadataInput{
+               Type:           puffin.BlobTypeDeletionVector,
+               SnapshotID:     -1,
+               SequenceNumber: -1,
+               Fields:         []int32{},
+               Properties: map[string]string{
+                       "referenced-data-file": referencedDataFile,
+                       "cardinality":          
strconv.FormatInt(bitmap.Cardinality(), 10),
+               },
+       }, payload)
+       require.NoError(t, err)
+       require.NoError(t, w.Finish())
+
+       path = filepath.Join(t.TempDir(), "dv.puffin")
+       require.NoError(t, os.WriteFile(path, puffinBuf.Bytes(), 0o644))
+
+       return path, blobMeta.Offset, blobMeta.Length
+}
+
+// newDVMockDataFile builds a manifest-entry-shaped mock for a DV puffin blob.
+// FileSizeBytes is intentionally left at zero (mockDataFile's default): the
+// scanner read path consults ContentOffset / ContentSizeInBytes only, and
+// setting filesize to the blob length (rather than the puffin file size)
+// invites a misleading test fixture.
+func newDVMockDataFile(puffinPath, referencedDataFile string, offset, 
contentSize int64) *dvMockDataFile {
+       return &dvMockDataFile{
+               mockDataFile: mockDataFile{
+                       path:        puffinPath,
+                       contentType: iceberg.EntryContentPosDeletes,
+                       format:      iceberg.PuffinFile,
+               },
+               referencedDataFile: strPtr(referencedDataFile),
+               contentOffset:      int64Ptr(offset),
+               contentSizeInBytes: int64Ptr(contentSize),
+       }
+}
+
+// Compile-time assertion of readAllDeletionVectors' signature — pins the
+// `perFileDVBitmaps` return type so GetRecords' downstream wiring through
+// recordBatchesFromTasksAndDeletes can't silently shift to a different shape
+// without breaking the build first. Placed at package scope so the contract
+// is visible at the top of the file.
+var _ func(context.Context, iceio.IO, []FileScanTask, int) (perFileDVBitmaps, 
error) = readAllDeletionVectors
+
+// TestReadAllDeletionVectors verifies the scanner-side DV loader produces a
+// perFileDVBitmaps map keyed by referenced data-file path, with the spec-
+// correctness invariants the loader is the right place to enforce: dedup by
+// referenced data file (not by puffin path, which would silently drop blobs
+// from multi-DV puffin files), rejection of missing referenced_data_file /
+// content_offset, and rejection of two distinct DV blobs targeting the same
+// data file (over-deletion guard, matching Java's DeleteFileIndex
+// ValidationException behaviour for that case).
+//
+// End-to-end coverage (DV positions actually filtered out of GetRecords
+// output for a real Parquet data file) is deferred to a follow-up — would
+// need v3 table-metadata scaffolding that lives outside this PR's scope.
+func TestReadAllDeletionVectors(t *testing.T) {
+       ctx := context.Background()
+       fs := iceio.LocalFS{}
+
+       t.Run("decodes positions and keys by referenced data file", func(t 
*testing.T) {
+               const dataFilePath = "file:///table/data/data-001.parquet"
+               puffinPath, offset, length := writeDVPuffinFixture(t, 
[]uint64{1, 3, 5, 7, 9}, dataFilePath)
+
+               tasks := []FileScanTask{{DeletionVectorFiles: 
[]iceberg.DataFile{
+                       newDVMockDataFile(puffinPath, dataFilePath, offset, 
length),
+               }}}
+
+               got, err := readAllDeletionVectors(ctx, fs, tasks, 1)
+               require.NoError(t, err)
+               require.Contains(t, got, dataFilePath,
+                       "map must be keyed by the referenced data file, not by 
the DV puffin path")
+
+               // Bitmap should expose the spec-mandated positions via Contains
+               // (the lookup path filterByDeletionVector will use per batch).
+               bitmap := got[dataFilePath]
+               require.NotNil(t, bitmap)
+               assert.Equal(t, int64(5), bitmap.Cardinality())
+               for _, pos := range []uint64{1, 3, 5, 7, 9} {
+                       assert.True(t, bitmap.Contains(pos), "expected %d to be 
in bitmap", pos)
+               }
+               // Spot-check a non-deleted position too.
+               assert.False(t, bitmap.Contains(0))
+       })
+
+       t.Run("dedups identical DV referenced by two tasks", func(t *testing.T) 
{
+               const dataFilePath = "file:///table/data/data-002.parquet"
+               puffinPath, offset, length := writeDVPuffinFixture(t, 
[]uint64{2, 4}, dataFilePath)
+               dvFile := newDVMockDataFile(puffinPath, dataFilePath, offset, 
length)
+
+               tasks := []FileScanTask{
+                       {DeletionVectorFiles: []iceberg.DataFile{dvFile}},
+                       {DeletionVectorFiles: []iceberg.DataFile{dvFile}},
+               }
+
+               got, err := readAllDeletionVectors(ctx, fs, tasks, 2)
+               require.NoError(t, err)
+               // Same (puffin path, content offset) across both tasks is the
+               // same blob — the second occurrence is a dedup no-op, so the
+               // single map entry stays a single bitmap (not a list of two).
+               require.NotNil(t, got[dataFilePath])
+               assert.Equal(t, int64(2), got[dataFilePath].Cardinality())
+       })
+
+       t.Run("no tasks -> empty map, no error", func(t *testing.T) {
+               got, err := readAllDeletionVectors(ctx, fs, nil, 1)
+               require.NoError(t, err)
+               assert.Empty(t, got)
+       })
+
+       t.Run("DV missing referenced_data_file is rejected", func(t *testing.T) 
{
+               puffinPath, offset, length := writeDVPuffinFixture(t, 
[]uint64{0}, "file:///placeholder.parquet")
+
+               dvFile := &dvMockDataFile{
+                       mockDataFile: mockDataFile{
+                               path:        puffinPath,
+                               contentType: iceberg.EntryContentPosDeletes,
+                               format:      iceberg.PuffinFile,
+                       },
+                       referencedDataFile: nil, // spec violation
+                       contentOffset:      int64Ptr(offset),
+                       contentSizeInBytes: int64Ptr(length),
+               }
+
+               _, err := readAllDeletionVectors(ctx, fs, 
[]FileScanTask{{DeletionVectorFiles: []iceberg.DataFile{dvFile}}}, 1)
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "missing referenced_data_file")
+       })
+
+       t.Run("DV missing content_offset is rejected", func(t *testing.T) {
+               // content_offset is required by spec; without it ReadDV can't
+               // locate the blob. Pinned at the pre-pass so two such entries
+               // for the same data file don't silently collide as a misleading
+               // "multiple deletion vectors" dedup error.
+               dvFile := &dvMockDataFile{
+                       mockDataFile: mockDataFile{
+                               path:        "/irrelevant.puffin",
+                               contentType: iceberg.EntryContentPosDeletes,
+                               format:      iceberg.PuffinFile,
+                       },
+                       referencedDataFile: 
strPtr("file:///table/data/data.parquet"),
+                       contentOffset:      nil, // spec violation
+                       contentSizeInBytes: int64Ptr(42),
+               }
+
+               _, err := readAllDeletionVectors(ctx, fs, 
[]FileScanTask{{DeletionVectorFiles: []iceberg.DataFile{dvFile}}}, 1)
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "missing content_offset")
+       })
+
+       t.Run("multiple distinct DV blobs for same data file are rejected", 
func(t *testing.T) {
+               // Two separate puffin files both claiming to be the DV for the 
same
+               // data file. Java surfaces this as a ValidationException; 
silently
+               // unioning the bitmaps would over-delete. The loader is also 
the
+               // right place to catch the case where two blobs share a puffin 
file
+               // but live at different content offsets — sameDVBlob compares 
both.
+               const dataFilePath = "file:///table/data/data-003.parquet"
+               puffinA, offsetA, lengthA := writeDVPuffinFixture(t, 
[]uint64{1, 2}, dataFilePath)
+               puffinB, offsetB, lengthB := writeDVPuffinFixture(t, 
[]uint64{3, 4}, dataFilePath)
+
+               tasks := []FileScanTask{{DeletionVectorFiles: 
[]iceberg.DataFile{
+                       newDVMockDataFile(puffinA, dataFilePath, offsetA, 
lengthA),
+                       newDVMockDataFile(puffinB, dataFilePath, offsetB, 
lengthB),
+               }}}
+
+               _, err := readAllDeletionVectors(ctx, fs, tasks, 1)
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "multiple deletion vectors for 
data file")
+       })
+
+       t.Run("nil referenced_data_file is rejected before any goroutine 
launches", func(t *testing.T) {
+               // The prior implementation validated inside the 
goroutine-dispatch
+               // loop, so a bad entry hit AFTER one or more g.Go() calls would
+               // defer-close the result channel while in-flight workers were 
still
+               // sending to it (panic: send on closed channel). The v2 
structure
+               // makes the original buggy state unreachable by construction —
+               // uniqueDVs is fully built and validated before any goroutine 
fans
+               // out. This test pins the invariant: with both a valid and a
+               // nil-ref entry in the input, the call returns a clean error 
and
+               // does not panic, regardless of how concurrency is configured.
+               const dataFilePath = "file:///table/data/data-004.parquet"
+               puffinPath, offset, length := writeDVPuffinFixture(t, 
[]uint64{42}, dataFilePath)
+               valid := newDVMockDataFile(puffinPath, dataFilePath, offset, 
length)
+               broken := &dvMockDataFile{
+                       mockDataFile: mockDataFile{
+                               path:        "/nonexistent.puffin",
+                               contentType: iceberg.EntryContentPosDeletes,
+                               format:      iceberg.PuffinFile,
+                       },
+                       referencedDataFile: nil,
+                       contentOffset:      int64Ptr(0),
+                       contentSizeInBytes: int64Ptr(0),
+               }
+
+               var err error
+               assert.NotPanics(t, func() {
+                       _, err = readAllDeletionVectors(ctx, fs, []FileScanTask{
+                               {DeletionVectorFiles: 
[]iceberg.DataFile{valid}},
+                               {DeletionVectorFiles: 
[]iceberg.DataFile{broken}},
+                       }, 4)
+               })
+               require.Error(t, err)
+       })
+}
+
+// TestFilterByDeletionVector pins the per-batch pipeline step that applies
+// a RoaringPositionBitmap to a record batch via Boolean keep-mask +
+// compute.Filter — the load-bearing optimization over materializing the
+// bitmap into a position set and using compute.Take.
+//
+// Spans two batches so the closure-captured absolute-position counter is
+// exercised: positions 1,3 deleted from batch 1 (rows 0-4), positions 7,9
+// from batch 2 (rows 5-9). The keep-mask logic is "false = drop, true =
+// keep", so the surviving rows must be 0,2,4,6,8 across both batches.
+func TestFilterByDeletionVector(t *testing.T) {
+       ctx := context.Background()
+       mem := memory.NewGoAllocator()
+
+       bitmap := dv.NewRoaringPositionBitmap()
+       for _, p := range []uint64{1, 3, 7, 9} {
+               bitmap.Set(p)
+       }
+       filter := filterByDeletionVector(ctx, bitmap, 10)
+
+       mkBatch := func(start, end int64) arrow.RecordBatch {
+               bldr := array.NewInt64Builder(mem)
+               defer bldr.Release()
+               for i := start; i < end; i++ {
+                       bldr.Append(i)
+               }
+               col := bldr.NewArray()
+               defer col.Release()
+               schema := arrow.NewSchema([]arrow.Field{{Name: "pos", Type: 
arrow.PrimitiveTypes.Int64}}, nil)
+
+               return array.NewRecordBatch(schema, []arrow.Array{col}, 
end-start)
+       }
+
+       t.Run("drops deleted rows across multiple batches", func(t *testing.T) {
+               batch1 := mkBatch(0, 5)
+               out1, err := filter(batch1)
+               require.NoError(t, err)
+               defer out1.Release()
+               assert.Equal(t, []int64{0, 2, 4},
+                       out1.Column(0).(*array.Int64).Int64Values())
+
+               batch2 := mkBatch(5, 10)
+               out2, err := filter(batch2)
+               require.NoError(t, err)
+               defer out2.Release()
+               assert.Equal(t, []int64{5, 6, 8},
+                       out2.Column(0).(*array.Int64).Int64Values())
+       })
+}
diff --git a/table/scanner.go b/table/scanner.go
index 8e94088f..93261a53 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -681,9 +681,19 @@ func (scan *Scan) PlanFiles(ctx context.Context) 
([]FileScanTask, error) {
 
        results := make([]FileScanTask, 0, len(entries.dataEntries))
        for _, e := range entries.dataEntries {
-               deleteFiles, err := matchDeletesToData(e, 
entries.positionalDeleteEntries)
-               if err != nil {
-                       return nil, err
+               // Spec §Scan Planning: when a deletion vector applies to a data
+               // file, positional-delete files must NOT be applied. The DV is
+               // guaranteed to encode all prior pos-delete positions; reading 
the
+               // pos-delete Parquet too would be wasteful I/O, and on a buggy
+               // writer whose DV omits prior positions, applying both would
+               // over-delete. Mirrors Java's DeleteFileIndex.forDataFile.
+               dvFiles := matchDVToData(e, dvIndex)
+               var deleteFiles []iceberg.DataFile
+               if len(dvFiles) == 0 {
+                       deleteFiles, err = matchDeletesToData(e, 
entries.positionalDeleteEntries)
+                       if err != nil {
+                               return nil, err
+                       }
                }
                eqDeleteFiles := matchEqualityDeletesToData(e, 
entries.equalityDeleteEntries)
 
@@ -691,7 +701,7 @@ func (scan *Scan) PlanFiles(ctx context.Context) 
([]FileScanTask, error) {
                        File:                e.DataFile(),
                        DeleteFiles:         deleteFiles,
                        EqualityDeleteFiles: eqDeleteFiles,
-                       DeletionVectorFiles: matchDVToData(e, dvIndex),
+                       DeletionVectorFiles: dvFiles,
                        Start:               0,
                        Length:              e.DataFile().FileSizeBytes(),
                }


Reply via email to