This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 2a37a19e feat(table/dv): plumb cardinality validation through ReadDV
(#1056)
2a37a19e is described below
commit 2a37a19e4be40f471d9eeba2ceeeb25e0077e051
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Tue May 12 18:33:39 2026 +0200
feat(table/dv): plumb cardinality validation through ReadDV (#1056)
Closes #1049. ReadDV used to pass -1 to DeserializeDV unconditionally,
skipping the spec-mandated cardinality check entirely. The puffin blob
property "cardinality" — what the writer attaches — is now read out of
the BlobMetadata that matches the manifest entry's (offset, size) and
fed into DeserializeDV, so a truncated bitmap whose CRC still validates
over the bytes that are present gets caught.
Property missing → skip validation for backward-compat with writers that
omit it (CRC still applies).
Property present but unparseable → hard error; silently falling back
would mask writer bugs.
---
table/dv/deletion_vector.go | 89 ++++++++++++++++++++++++++--
table/dv/deletion_vector_test.go | 124 ++++++++++++++++++++++++++++++++++++++-
2 files changed, 206 insertions(+), 7 deletions(-)
diff --git a/table/dv/deletion_vector.go b/table/dv/deletion_vector.go
index 2b618c89..735ac6fb 100644
--- a/table/dv/deletion_vector.go
+++ b/table/dv/deletion_vector.go
@@ -21,12 +21,21 @@ import (
"encoding/binary"
"fmt"
"hash/crc32"
+ "log/slog"
+ "strconv"
"github.com/apache/iceberg-go"
iceio "github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/puffin"
)
+// dvCardinalityProperty is the spec-mandated blob property name carrying the
+// number of deleted row positions encoded in the inner roaring bitmap. The
+// writer attaches it; the reader uses it to detect truncated bitmaps whose
+// CRC happens to validate (the CRC covers the bytes that ARE present, not
+// the bytes that should have been).
+const dvCardinalityProperty = "cardinality"
+
const (
// DVMagicNumber is the magic number for deletion vectors.
// Spec bytes: D1 D3 39 64 (big-endian) = 0x6439D3D1 (little-endian
uint32)
@@ -94,6 +103,23 @@ func DeserializeDV(data []byte, expectedCardinality int64)
(*RoaringPositionBitm
// ReadDV reads a deletion vector from a puffin file using the manifest entry
metadata.
// ContentOffset and ContentSizeInBytes must be set on the DataFile (required
by v3 spec).
+//
+// When the puffin blob carries a `cardinality` property (spec-mandated for
+// deletion-vector-v1), its value is parsed and used to validate the decoded
+// bitmap — this catches truncated or partially-overwritten blobs whose CRC
+// still validates over the bytes that are present.
+//
+// Java's BitmapPositionDeleteIndex validates against the manifest entry's
+// `record_count` field rather than the puffin blob property; the two are
+// always set to the same value by Java's writer, so this PR's behavior agrees
+// with Java for spec-conformant tables. A future change should cross-validate
+// both sources when they're independently available (tracked separately).
+//
+// Blobs missing the spec-required cardinality property are accepted with a
+// slog warning rather than rejected — strict enforcement is deferred until
+// the Go DV writer guarantees the property is always present. The per-byte
+// CRC check in DeserializeDV still applies, so missing-property is degraded
+// integrity, not absent integrity.
func ReadDV(fs iceio.IO, dvFile iceberg.DataFile) (*RoaringPositionBitmap,
error) {
if dvFile.FileFormat() != iceberg.PuffinFile {
return nil, fmt.Errorf("expected PUFFIN format for deletion
vector, got %s", dvFile.FileFormat())
@@ -125,8 +151,63 @@ func ReadDV(fs iceio.IO, dvFile iceberg.DataFile)
(*RoaringPositionBitmap, error
return nil, fmt.Errorf("read DV blob at offset %d: %w", offset,
err)
}
- // Pass -1 to skip cardinality validation during deserialization.
- // dvFile.Count() defaults to 0 when unset, which would incorrectly
- // reject valid DVs. Callers can validate cardinality separately.
- return DeserializeDV(blobData, -1)
+ cardinality, ok, err := blobCardinality(reader.Blobs(), offset, size)
+ if err != nil {
+ return nil, fmt.Errorf("DV file %s: %w", dvFile.FilePath(), err)
+ }
+ if !ok {
+ // Spec deviation: deletion-vector-v1 MUST carry a cardinality
+ // property. We tolerate the absence to keep reading from third-
+ // party writers that emit non-conformant files; flag so
operators
+ // can identify affected tables.
+ slog.Warn("DV blob missing spec-required cardinality property;
skipping cardinality validation",
+ "dv_file", dvFile.FilePath(), "offset", offset)
+ cardinality = -1
+ }
+
+ return DeserializeDV(blobData, cardinality)
+}
+
+// blobCardinality returns the cardinality declared by the puffin blob at the
+// manifest entry's (offset, size). The bool indicates whether the property was
+// present:
+//
+// - (n, true, nil) — property found and parsed successfully
+// - (0, false, nil) — matching blob found but no cardinality property
+// - (_, _, err) — manifest/footer mismatch or property unparseable
+//
+// Keeping the sentinel out of the int64 return channel avoids leaking
+// DeserializeDV's "-1 means skip" convention up the call chain.
+func blobCardinality(blobs []puffin.BlobMetadata, offset, size int64) (int64,
bool, error) {
+ for _, b := range blobs {
+ if b.Offset != offset {
+ continue
+ }
+ if b.Length != size {
+ // Same starting offset, different length: the manifest
entry
+ // disagrees with the puffin footer on how big this
blob is.
+ // Surface that distinct condition rather than rolling
it into
+ // "no blob at offset" — different writer bug,
different fix.
+ return 0, false, fmt.Errorf("blob at offset %d has
length %d, manifest says %d", offset, b.Length, size)
+ }
+ v, ok := b.Properties[dvCardinalityProperty]
+ if !ok {
+ return 0, false, nil
+ }
+ parsed, err := strconv.ParseInt(v, 10, 64)
+ if err != nil {
+ return 0, false, fmt.Errorf("invalid %s property %q:
%w", dvCardinalityProperty, v, err)
+ }
+ if parsed < 0 {
+ // Negative is meaningless for a count of deleted
positions, and
+ // `-1` specifically aliases DeserializeDV's
skip-validation
+ // sentinel — accepting it would silently disable the
very check
+ // this layer was added to perform.
+ return 0, false, fmt.Errorf("%s property must be
non-negative, got %d", dvCardinalityProperty, parsed)
+ }
+
+ return parsed, true, nil
+ }
+
+ return 0, false, fmt.Errorf("no blob in puffin footer at offset %d,
size %d", offset, size)
}
diff --git a/table/dv/deletion_vector_test.go b/table/dv/deletion_vector_test.go
index 1ea87142..e6e68470 100644
--- a/table/dv/deletion_vector_test.go
+++ b/table/dv/deletion_vector_test.go
@@ -93,6 +93,15 @@ func wrapDVPayloadForTest(bitmapBytes []byte) []byte {
}
func writePuffinWithDVBlob(t *testing.T, dir string, dvBlobBytes []byte)
(string, puffin.BlobMetadata) {
+ return writePuffinWithDVBlobAndProps(t, dir, dvBlobBytes,
map[string]string{
+ "referenced-data-file": "s3://bucket/data/data-001.parquet",
+ })
+}
+
+// writePuffinWithDVBlobAndProps is the same as writePuffinWithDVBlob but with
+// caller-supplied blob properties — used by the cardinality-validation tests
+// to inject the spec-mandated `cardinality` property (or omit it).
+func writePuffinWithDVBlobAndProps(t *testing.T, dir string, dvBlobBytes
[]byte, props map[string]string) (string, puffin.BlobMetadata) {
t.Helper()
path := filepath.Join(dir, "test-dv.puffin")
@@ -108,9 +117,7 @@ func writePuffinWithDVBlob(t *testing.T, dir string,
dvBlobBytes []byte) (string
SnapshotID: -1,
SequenceNumber: -1,
Fields: []int32{2147483546},
- Properties: map[string]string{
- "referenced-data-file":
"s3://bucket/data/data-001.parquet",
- },
+ Properties: props,
}, dvBlobBytes)
require.NoError(t, err)
require.NoError(t, w.Finish())
@@ -380,3 +387,114 @@ func TestReadDVInvalidBlobRange(t *testing.T) {
_, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5, &offset,
&size))
assert.ErrorContains(t, err, "read DV blob at offset 0")
}
+
+// TestReadDVCardinalityValidation pins the spec-mandated check that the
+// puffin blob's `cardinality` property matches the bitmap's actual decoded
+// cardinality. Catches truncated/partially-overwritten blobs whose CRC still
+// validates over the bytes that are present (5 spec positions in the
+// reference fixture).
+func TestReadDVCardinalityValidation(t *testing.T) {
+ dvBlobBytes := readDVTestData(t,
"small-alternating-values-position-index.bin")
+
+ t.Run("matching cardinality passes", func(t *testing.T) {
+ dir := t.TempDir()
+ path, meta := writePuffinWithDVBlobAndProps(t, dir,
dvBlobBytes, map[string]string{
+ "referenced-data-file":
"s3://bucket/data/data-001.parquet",
+ "cardinality": "5",
+ })
+ offset, size := meta.Offset, meta.Length
+ bm, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5,
&offset, &size))
+ require.NoError(t, err)
+ assert.Equal(t, int64(5), bm.Cardinality())
+ })
+
+ t.Run("mismatched cardinality is rejected", func(t *testing.T) {
+ dir := t.TempDir()
+ path, meta := writePuffinWithDVBlobAndProps(t, dir,
dvBlobBytes, map[string]string{
+ "referenced-data-file":
"s3://bucket/data/data-001.parquet",
+ // Bitmap actually has 5 positions; claim 99.
+ "cardinality": "99",
+ })
+ offset, size := meta.Offset, meta.Length
+ _, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5,
&offset, &size))
+ require.Error(t, err)
+ // Pin the specific error path: DeserializeDV's "cardinality
+ // mismatch", not the helper's "invalid cardinality" parse
error.
+ assert.Contains(t, err.Error(), "cardinality mismatch")
+ })
+
+ t.Run("missing property is tolerated with a warning", func(t
*testing.T) {
+ // No `cardinality` property — spec deviation tolerated for read
+ // compatibility with third-party writers that omit it. ReadDV
logs
+ // a warning (see slog.Warn) and falls back to CRC-only
validation
+ // inside DeserializeDV. Strict enforcement is deferred until
the
+ // Go writer-side coverage lands.
+ dir := t.TempDir()
+ path, meta := writePuffinWithDVBlob(t, dir, dvBlobBytes)
+ offset, size := meta.Offset, meta.Length
+ bm, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5,
&offset, &size))
+ require.NoError(t, err)
+ assert.Equal(t, int64(5), bm.Cardinality())
+ })
+
+ t.Run("malformed property is rejected", func(t *testing.T) {
+ // Property present but unparseable — silently falling back to
-1
+ // would mask writer bugs, so this is a hard error.
+ dir := t.TempDir()
+ path, meta := writePuffinWithDVBlobAndProps(t, dir,
dvBlobBytes, map[string]string{
+ "referenced-data-file":
"s3://bucket/data/data-001.parquet",
+ "cardinality": "not-a-number",
+ })
+ offset, size := meta.Offset, meta.Length
+ _, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5,
&offset, &size))
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "invalid cardinality")
+ })
+
+ t.Run("negative cardinality is rejected (not silently treated as
skip-validation sentinel)", func(t *testing.T) {
+ // -1 happens to alias DeserializeDV's internal skip-validation
+ // sentinel; accepting it from blob properties would silently
+ // disable the very check this PR adds. The pre-pass rejects all
+ // negative values.
+ dir := t.TempDir()
+ path, meta := writePuffinWithDVBlobAndProps(t, dir,
dvBlobBytes, map[string]string{
+ "referenced-data-file":
"s3://bucket/data/data-001.parquet",
+ "cardinality": "-1",
+ })
+ offset, size := meta.Offset, meta.Length
+ _, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5,
&offset, &size))
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "must be non-negative")
+ })
+
+ t.Run("manifest offset not found in footer is rejected with a precise
error", func(t *testing.T) {
+ // Manifest entry points at an in-bounds offset that doesn't
match
+ // any footer blob's starting position. Surfaces as the helper's
+ // "no blob in puffin footer" error rather than the deeper CRC
+ // error from DeserializeDV — clearer signal for diagnosing
+ // manifest corruption.
+ dir := t.TempDir()
+ path, meta := writePuffinWithDVBlob(t, dir, dvBlobBytes)
+ // Offset shifted by 1 keeps the ReadAt range inside the blob
+ // region (no "extends into footer" error) while guaranteeing no
+ // footer blob starts at that exact offset.
+ wrongOffset, smallSize := meta.Offset+1, int64(8)
+ _, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5,
&wrongOffset, &smallSize))
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "no blob in puffin footer")
+ })
+
+ t.Run("manifest size disagrees with footer length at matching offset",
func(t *testing.T) {
+ // Same starting offset but a different length — a distinct
writer
+ // bug from "no blob at offset". The helper surfaces a precise
+ // "blob at offset N has length M, manifest says K" message
rather
+ // than the broader "no blob" path.
+ dir := t.TempDir()
+ path, meta := writePuffinWithDVBlob(t, dir, dvBlobBytes)
+ offset := meta.Offset
+ wrongSize := meta.Length - 1
+ _, err := ReadDV(iceio.LocalFS{}, newDVTestFile(path, 5,
&offset, &wrongSize))
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "manifest says")
+ })
+}