This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a37a19e feat(table/dv): plumb cardinality validation through ReadDV 
(#1056)
2a37a19e is described below

commit 2a37a19e4be40f471d9eeba2ceeeb25e0077e051
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Tue May 12 18:33:39 2026 +0200

    feat(table/dv): plumb cardinality validation through ReadDV (#1056)
    
    Closes #1049. ReadDV used to pass -1 to DeserializeDV unconditionally,
    skipping the spec-mandated cardinality check entirely. The puffin blob
    property "cardinality" — what the writer attaches — is now read out of
    the BlobMetadata that matches the manifest entry's (offset, size) and
    fed into DeserializeDV, so a truncated bitmap whose CRC still validates
    over the bytes that are present gets caught.
    
    Property missing → skip validation for backward-compat with writers that
    omit it (CRC still applies).
    
    Property present but unparseable → hard error; silently falling back
    would mask writer bugs.
---
 table/dv/deletion_vector.go      |  89 ++++++++++++++++++++++++++--
 table/dv/deletion_vector_test.go | 124 ++++++++++++++++++++++++++++++++++++++-
 2 files changed, 206 insertions(+), 7 deletions(-)

diff --git a/table/dv/deletion_vector.go b/table/dv/deletion_vector.go
index 2b618c89..735ac6fb 100644
--- a/table/dv/deletion_vector.go
+++ b/table/dv/deletion_vector.go
@@ -21,12 +21,21 @@ import (
        "encoding/binary"
        "fmt"
        "hash/crc32"
+       "log/slog"
+       "strconv"
 
        "github.com/apache/iceberg-go"
        iceio "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/puffin"
 )
 
+// dvCardinalityProperty is the spec-mandated blob property name carrying the
+// number of deleted row positions encoded in the inner roaring bitmap. The
+// writer attaches it; the reader uses it to detect truncated bitmaps whose
+// CRC happens to validate (the CRC covers the bytes that ARE present, not
+// the bytes that should have been).
+const dvCardinalityProperty = "cardinality"
+
 const (
        // DVMagicNumber is the magic number for deletion vectors.
        // Spec bytes: D1 D3 39 64 (big-endian) = 0x6439D3D1 (little-endian 
uint32)
@@ -94,6 +103,23 @@ func DeserializeDV(data []byte, expectedCardinality int64) 
(*RoaringPositionBitm
 
 // ReadDV reads a deletion vector from a puffin file using the manifest entry 
metadata.
 // ContentOffset and ContentSizeInBytes must be set on the DataFile (required 
by v3 spec).
+//
+// When the puffin blob carries a `cardinality` property (spec-mandated for
+// deletion-vector-v1), its value is parsed and used to validate the decoded
+// bitmap — this catches truncated or partially-overwritten blobs whose CRC
+// still validates over the bytes that are present.
+//
+// Java's BitmapPositionDeleteIndex validates against the manifest entry's
+// `record_count` field rather than the puffin blob property; the two are
+// always set to the same value by Java's writer, so this PR's behavior agrees
+// with Java for spec-conformant tables. A future change should cross-validate
+// both sources when they're independently available (tracked separately).
+//
+// Blobs missing the spec-required cardinality property are accepted with a
+// slog warning rather than rejected — strict enforcement is deferred until
+// the Go DV writer guarantees the property is always present. The per-byte
+// CRC check in DeserializeDV still applies, so missing-property is degraded
+// integrity, not absent integrity.
 func ReadDV(fs iceio.IO, dvFile iceberg.DataFile) (*RoaringPositionBitmap, 
error) {
        if dvFile.FileFormat() != iceberg.PuffinFile {
                return nil, fmt.Errorf("expected PUFFIN format for deletion 
vector, got %s", dvFile.FileFormat())
@@ -125,8 +151,63 @@ func ReadDV(fs iceio.IO, dvFile iceberg.DataFile) 
(*RoaringPositionBitmap, error
                return nil, fmt.Errorf("read DV blob at offset %d: %w", offset, 
err)
        }
 
-       // Pass -1 to skip cardinality validation during deserialization.
-       // dvFile.Count() defaults to 0 when unset, which would incorrectly
-       // reject valid DVs. Callers can validate cardinality separately.
-       return DeserializeDV(blobData, -1)
+       cardinality, ok, err := blobCardinality(reader.Blobs(), offset, size)
+       if err != nil {
+               return nil, fmt.Errorf("DV file %s: %w", dvFile.FilePath(), err)
+       }
+       if !ok {
+               // Spec deviation: deletion-vector-v1 MUST carry a cardinality
+               // property. We tolerate the absence to keep reading from third-
+               // party writers that emit non-conformant files; flag so 
operators
+               // can identify affected tables.
+               slog.Warn("DV blob missing spec-required cardinality property; 
skipping cardinality validation",
+                       "dv_file", dvFile.FilePath(), "offset", offset)
+               cardinality = -1
+       }
+
+       return DeserializeDV(blobData, cardinality)
+}
+
+// blobCardinality returns the cardinality declared by the puffin blob at the
+// manifest entry's (offset, size). The bool indicates whether the property was
+// present:
+//
+//   - (n, true, nil)  — property found and parsed successfully
+//   - (0, false, nil) — matching blob found but no cardinality property
+//   - (_, _, err)     — manifest/footer mismatch or property unparseable
+//
+// Keeping the sentinel out of the int64 return channel avoids leaking
+// DeserializeDV's "-1 means skip" convention up the call chain.
+func blobCardinality(blobs []puffin.BlobMetadata, offset, size int64) (int64, 
bool, error) {
+       for _, b := range blobs {
+               if b.Offset != offset {
+                       continue
+               }
+               if b.Length != size {
+                       // Same starting offset, different length: the manifest 
entry
+                       // disagrees with the puffin footer on how big this 
blob is.
+                       // Surface that distinct condition rather than rolling 
it into
+                       // "no blob at offset" — different writer bug, 
different fix.
+                       return 0, false, fmt.Errorf("blob at offset %d has 
length %d, manifest says %d", offset, b.Length, size)
+               }
+               v, ok := b.Properties[dvCardinalityProperty]
+               if !ok {
+                       return 0, false, nil
+               }
+               parsed, err := strconv.ParseInt(v, 10, 64)
+               if err != nil {
+                       return 0, false, fmt.Errorf("invalid %s property %q: 
%w", dvCardinalityProperty, v, err)
+               }
+               if parsed < 0 {
+                       // Negative is meaningless for a count of deleted 
positions, and
+                       // `-1` specifically aliases DeserializeDV's 
skip-validation
+                       // sentinel — accepting it would silently disable the 
very check
+                       // this layer was added to perform.
+                       return 0, false, fmt.Errorf("%s property must be 
non-negative, got %d", dvCardinalityProperty, parsed)
+               }
+
+               return parsed, true, nil
+       }
+
+       return 0, false, fmt.Errorf("no blob in puffin footer at offset %d, 
size %d", offset, size)
 }
diff --git a/table/dv/deletion_vector_test.go b/table/dv/deletion_vector_test.go
index 1ea87142..e6e68470 100644
--- a/table/dv/deletion_vector_test.go
+++ b/table/dv/deletion_vector_test.go
@@ -93,6 +93,15 @@ func wrapDVPayloadForTest(bitmapBytes []byte) []byte {
 }
 
 func writePuffinWithDVBlob(t *testing.T, dir string, dvBlobBytes []byte) 
(string, puffin.BlobMetadata) {
+       return writePuffinWithDVBlobAndProps(t, dir, dvBlobBytes, 
map[string]string{
+               "referenced-data-file": "s3://bucket/data/data-001.parquet",
+       })
+}
+
+// writePuffinWithDVBlobAndProps is the same as writePuffinWithDVBlob but with
+// caller-supplied blob properties — used by the cardinality-validation tests
+// to inject the spec-mandated `cardinality` property (or omit it).
+func writePuffinWithDVBlobAndProps(t *testing.T, dir string, dvBlobBytes 
[]byte, props map[string]string) (string, puffin.BlobMetadata) {
        t.Helper()
 
        path := filepath.Join(dir, "test-dv.puffin")
@@ -108,9 +117,7 @@ func writePuffinWithDVBlob(t *testing.T, dir string, 
dvBlobBytes []byte) (string
                SnapshotID:     -1,
                SequenceNumber: -1,
                Fields:         []int32{2147483546},
-               Properties: map[string]string{
-                       "referenced-data-file": 
"s3://bucket/data/data-001.parquet",
-               },
+               Properties:     props,
        }, dvBlobBytes)
        require.NoError(t, err)
        require.NoError(t, w.Finish())
@@ -380,3 +387,114 @@ func TestReadDVInvalidBlobRange(t *testing.T) {
        _, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5, &offset, 
&size))
        assert.ErrorContains(t, err, "read DV blob at offset 0")
 }
+
+// TestReadDVCardinalityValidation pins the spec-mandated check that the
+// puffin blob's `cardinality` property matches the bitmap's actual decoded
+// cardinality. Catches truncated/partially-overwritten blobs whose CRC still
+// validates over the bytes that are present (5 spec positions in the
+// reference fixture).
+func TestReadDVCardinalityValidation(t *testing.T) {
+       dvBlobBytes := readDVTestData(t, 
"small-alternating-values-position-index.bin")
+
+       t.Run("matching cardinality passes", func(t *testing.T) {
+               dir := t.TempDir()
+               path, meta := writePuffinWithDVBlobAndProps(t, dir, 
dvBlobBytes, map[string]string{
+                       "referenced-data-file": 
"s3://bucket/data/data-001.parquet",
+                       "cardinality":          "5",
+               })
+               offset, size := meta.Offset, meta.Length
+               bm, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5, 
&offset, &size))
+               require.NoError(t, err)
+               assert.Equal(t, int64(5), bm.Cardinality())
+       })
+
+       t.Run("mismatched cardinality is rejected", func(t *testing.T) {
+               dir := t.TempDir()
+               path, meta := writePuffinWithDVBlobAndProps(t, dir, 
dvBlobBytes, map[string]string{
+                       "referenced-data-file": 
"s3://bucket/data/data-001.parquet",
+                       // Bitmap actually has 5 positions; claim 99.
+                       "cardinality": "99",
+               })
+               offset, size := meta.Offset, meta.Length
+               _, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5, 
&offset, &size))
+               require.Error(t, err)
+               // Pin the specific error path: DeserializeDV's "cardinality
+               // mismatch", not the helper's "invalid cardinality" parse 
error.
+               assert.Contains(t, err.Error(), "cardinality mismatch")
+       })
+
+       t.Run("missing property is tolerated with a warning", func(t 
*testing.T) {
+               // No `cardinality` property — spec deviation tolerated for read
+               // compatibility with third-party writers that omit it. ReadDV 
logs
+               // a warning (see slog.Warn) and falls back to CRC-only 
validation
+               // inside DeserializeDV. Strict enforcement is deferred until 
the
+               // Go writer-side coverage lands.
+               dir := t.TempDir()
+               path, meta := writePuffinWithDVBlob(t, dir, dvBlobBytes)
+               offset, size := meta.Offset, meta.Length
+               bm, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5, 
&offset, &size))
+               require.NoError(t, err)
+               assert.Equal(t, int64(5), bm.Cardinality())
+       })
+
+       t.Run("malformed property is rejected", func(t *testing.T) {
+               // Property present but unparseable — silently falling back to 
-1
+               // would mask writer bugs, so this is a hard error.
+               dir := t.TempDir()
+               path, meta := writePuffinWithDVBlobAndProps(t, dir, 
dvBlobBytes, map[string]string{
+                       "referenced-data-file": 
"s3://bucket/data/data-001.parquet",
+                       "cardinality":          "not-a-number",
+               })
+               offset, size := meta.Offset, meta.Length
+               _, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5, 
&offset, &size))
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "invalid cardinality")
+       })
+
+       t.Run("negative cardinality is rejected (not silently treated as 
skip-validation sentinel)", func(t *testing.T) {
+               // -1 happens to alias DeserializeDV's internal skip-validation
+               // sentinel; accepting it from blob properties would silently
+               // disable the very check this PR adds. The pre-pass rejects all
+               // negative values.
+               dir := t.TempDir()
+               path, meta := writePuffinWithDVBlobAndProps(t, dir, 
dvBlobBytes, map[string]string{
+                       "referenced-data-file": 
"s3://bucket/data/data-001.parquet",
+                       "cardinality":          "-1",
+               })
+               offset, size := meta.Offset, meta.Length
+               _, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5, 
&offset, &size))
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "must be non-negative")
+       })
+
+       t.Run("manifest offset not found in footer is rejected with a precise 
error", func(t *testing.T) {
+               // Manifest entry points at an in-bounds offset that doesn't 
match
+               // any footer blob's starting position. Surfaces as the helper's
+               // "no blob in puffin footer" error rather than the deeper CRC
+               // error from DeserializeDV — clearer signal for diagnosing
+               // manifest corruption.
+               dir := t.TempDir()
+               path, meta := writePuffinWithDVBlob(t, dir, dvBlobBytes)
+               // Offset shifted by 1 keeps the ReadAt range inside the blob
+               // region (no "extends into footer" error) while guaranteeing no
+               // footer blob starts at that exact offset.
+               wrongOffset, smallSize := meta.Offset+1, int64(8)
+               _, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5, 
&wrongOffset, &smallSize))
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "no blob in puffin footer")
+       })
+
+       t.Run("manifest size disagrees with footer length at matching offset", 
func(t *testing.T) {
+               // Same starting offset but a different length — a distinct 
writer
+               // bug from "no blob at offset". The helper surfaces a precise
+               // "blob at offset N has length M, manifest says K" message 
rather
+               // than the broader "no blob" path.
+               dir := t.TempDir()
+               path, meta := writePuffinWithDVBlob(t, dir, dvBlobBytes)
+               offset := meta.Offset
+               wrongSize := meta.Length - 1
+               _, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5, 
&offset, &wrongSize))
+               require.Error(t, err)
+               assert.Contains(t, err.Error(), "manifest says")
+       })
+}

Reply via email to