This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new ab1ce32e feat(table): add DV planning validations to match Java's
DeleteFileIndex (#1050) (#1108)
ab1ce32e is described below
commit ab1ce32e712d0d34b7c8dea0fcb255eee8ddf253
Author: Tanmay Rauth <[email protected]>
AuthorDate: Fri May 22 18:16:51 2026 +0530
feat(table): add DV planning validations to match Java's DeleteFileIndex
(#1050) (#1108)
Add sequence-number guard and reject multiple DVs per data file during
scan planning, matching Java's DeleteFileIndex behavior.
Closes: #1050
---
table/dv_scan_planning_test.go | 138 +++++++++++++++++++++++++++++++----------
table/scanner.go | 43 ++++++++++---
2 files changed, 140 insertions(+), 41 deletions(-)
diff --git a/table/dv_scan_planning_test.go b/table/dv_scan_planning_test.go
index 93278369..7e04910c 100644
--- a/table/dv_scan_planning_test.go
+++ b/table/dv_scan_planning_test.go
@@ -164,43 +164,35 @@ func TestDVMatchingToDataFiles(t *testing.T) {
}
snapshotID := int64(1)
+ seqNum := int64(1)
dvEntries := []iceberg.ManifestEntry{
- iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID,
nil, nil, dvForData1),
- iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID,
nil, nil, dvForData2),
+ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID,
&seqNum, nil, dvForData1),
+ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID,
&seqNum, nil, dvForData2),
}
- // Match DVs against data-001 — should only get dv-001
- var matched []iceberg.DataFile
- for _, del := range dvEntries {
- if del.DataFile().ReferencedDataFile() == nil {
- continue
- }
-
- if *del.DataFile().ReferencedDataFile() == dataFilePath {
- matched = append(matched, del.DataFile())
- }
- }
+ dvIndex, err := buildDVIndex(dvEntries)
+ assert.NoError(t, err)
+ assert.Len(t, dvIndex, 2)
+ // Match DVs against data-001 — should only get dv-001
+ dataEntry1 := iceberg.NewManifestEntry(iceberg.EntryStatusADDED,
&snapshotID, &seqNum, nil,
+ &mockDataFile{path: dataFilePath, contentType:
iceberg.EntryContentData})
+ matched := matchDVToData(dataEntry1, dvIndex)
assert.Len(t, matched, 1)
assert.Equal(t, dvForData1.path, matched[0].FilePath())
// Match DVs against data-002 — should only get dv-002
- var matched2 []iceberg.DataFile
- for _, del := range dvEntries {
- if del.DataFile().ReferencedDataFile() == nil {
- continue
- }
-
- if *del.DataFile().ReferencedDataFile() == otherDataFilePath {
- matched2 = append(matched2, del.DataFile())
- }
- }
-
+ dataEntry2 := iceberg.NewManifestEntry(iceberg.EntryStatusADDED,
&snapshotID, &seqNum, nil,
+ &mockDataFile{path: otherDataFilePath, contentType:
iceberg.EntryContentData})
+ matched2 := matchDVToData(dataEntry2, dvIndex)
assert.Len(t, matched2, 1)
assert.Equal(t, dvForData2.path, matched2[0].FilePath())
}
func TestDVMatchingNoMatch(t *testing.T) {
+ snapshotID := int64(1)
+ seqNum := int64(1)
+
dvFile := &dvMockDataFile{
mockDataFile: mockDataFile{
path: "s3://bucket/data/dv-001.puffin",
@@ -211,23 +203,101 @@ func TestDVMatchingNoMatch(t *testing.T) {
contentSizeInBytes: int64Ptr(128),
}
+ dvEntries := []iceberg.ManifestEntry{
+ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID,
&seqNum, nil, dvFile),
+ }
+
+ dvIndex, err := buildDVIndex(dvEntries)
+ assert.NoError(t, err)
+
+ dataEntry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED,
&snapshotID, &seqNum, nil,
+ &mockDataFile{path: "s3://bucket/data/data-001.parquet",
contentType: iceberg.EntryContentData})
+ matched := matchDVToData(dataEntry, dvIndex)
+ assert.Empty(t, matched)
+}
+
+func TestBuildDVIndex_RejectsMultipleDVsPerDataFile(t *testing.T) {
+ dataFilePath := "s3://bucket/data/data-001.parquet"
+ snapshotID := int64(1)
+ seqNum := int64(1)
+
+ dv1 := &dvMockDataFile{
+ mockDataFile: mockDataFile{
+ path: "s3://bucket/data/dv-001.puffin",
+ contentType: iceberg.EntryContentPosDeletes,
+ },
+ referencedDataFile: strPtr(dataFilePath),
+ contentOffset: int64Ptr(0),
+ contentSizeInBytes: int64Ptr(128),
+ }
+
+ dv2 := &dvMockDataFile{
+ mockDataFile: mockDataFile{
+ path: "s3://bucket/data/dv-002.puffin",
+ contentType: iceberg.EntryContentPosDeletes,
+ },
+ referencedDataFile: strPtr(dataFilePath),
+ contentOffset: int64Ptr(256),
+ contentSizeInBytes: int64Ptr(64),
+ }
+
+ dvEntries := []iceberg.ManifestEntry{
+ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID,
&seqNum, nil, dv1),
+ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID,
&seqNum, nil, dv2),
+ }
+
+ _, err := buildDVIndex(dvEntries)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "can't index multiple deletion vectors")
+ assert.Contains(t, err.Error(), dataFilePath)
+}
+
+func TestMatchDVToData_SequenceNumberGuard(t *testing.T) {
+ dataFilePath := "s3://bucket/data/data-001.parquet"
snapshotID := int64(1)
+
+ dvFile := &dvMockDataFile{
+ mockDataFile: mockDataFile{
+ path: "s3://bucket/data/dv-001.puffin",
+ contentType: iceberg.EntryContentPosDeletes,
+ },
+ referencedDataFile: strPtr(dataFilePath),
+ contentOffset: int64Ptr(0),
+ contentSizeInBytes: int64Ptr(128),
+ }
+
+ dvSeqNum := int64(5)
dvEntries := []iceberg.ManifestEntry{
- iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID,
nil, nil, dvFile),
+ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID,
&dvSeqNum, nil, dvFile),
}
- var matched []iceberg.DataFile
- for _, del := range dvEntries {
- if del.DataFile().ReferencedDataFile() == nil {
- continue
- }
+ dvIndex, err := buildDVIndex(dvEntries)
+ assert.NoError(t, err)
- if *del.DataFile().ReferencedDataFile() ==
"s3://bucket/data/data-001.parquet" {
- matched = append(matched, del.DataFile())
- }
+ tests := []struct {
+ name string
+ dataSeqNum int64
+ expectDV bool
+ }{
+ {"data seq < DV seq — DV applies", 3, true},
+ {"data seq == DV seq — DV applies", 5, true},
+ {"data seq > DV seq — DV skipped", 7, false},
}
- assert.Empty(t, matched)
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ seqNum := tt.dataSeqNum
+ dataEntry :=
iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, &seqNum, nil,
+ &mockDataFile{path: dataFilePath, contentType:
iceberg.EntryContentData})
+ matched := matchDVToData(dataEntry, dvIndex)
+ if tt.expectDV {
+ assert.Len(t, matched, 1)
+ assert.Equal(t, dvFile.path,
matched[0].FilePath())
+ } else {
+ assert.Empty(t, matched)
+ }
+ })
+ }
}
func TestFileScanTask_DeletionVectorFilesField(t *testing.T) {
diff --git a/table/scanner.go b/table/scanner.go
index b5a9b15d..3e9c6e0e 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -429,6 +429,38 @@ func partitionsMatch(a, b map[int]any) bool {
return true
}
+// buildDVIndex indexes deletion vectors by the data file path they reference.
+// The spec requires at most one DV per data file; a second entry for the same
+// path is rejected with an error.
+func buildDVIndex(dvEntries []iceberg.ManifestEntry)
(map[string]iceberg.ManifestEntry, error) {
+ dvIndex := make(map[string]iceberg.ManifestEntry, len(dvEntries))
+ for _, del := range dvEntries {
+ if ref := del.DataFile().ReferencedDataFile(); ref != nil {
+ if _, exists := dvIndex[*ref]; exists {
+ return nil, fmt.Errorf("can't index multiple
deletion vectors for %s", *ref)
+ }
+ dvIndex[*ref] = del
+ }
+ }
+
+ return dvIndex, nil
+}
+
+// matchDVToData returns the deletion vector that applies to the given data
+// entry, if any. A DV applies only when the data file's sequence number is
+// less than or equal to the DV's sequence number.
+func matchDVToData(dataEntry iceberg.ManifestEntry, dvIndex
map[string]iceberg.ManifestEntry) []iceberg.DataFile {
+ dvEntry, ok := dvIndex[dataEntry.DataFile().FilePath()]
+ if !ok {
+ return nil
+ }
+ if dataEntry.SequenceNum() <= dvEntry.SequenceNum() {
+ return []iceberg.DataFile{dvEntry.DataFile()}
+ }
+
+ return nil
+}
+
// fetchPartitionSpecFilteredManifests retrieves the table's current snapshot,
// fetches its manifest files, and applies partition-spec filters to remove
irrelevant manifests.
func (scan *Scan) fetchPartitionSpecFilteredManifests(ctx context.Context)
([]iceberg.ManifestFile, error) {
@@ -567,12 +599,9 @@ func (scan *Scan) PlanFiles(ctx context.Context)
([]FileScanTask, error) {
return cmp.Compare(a.SequenceNum(), b.SequenceNum())
})
- // Index DVs by referenced data file path for O(1) lookup.
- dvIndex := make(map[string][]iceberg.DataFile, len(entries.dvEntries))
- for _, del := range entries.dvEntries {
- if ref := del.DataFile().ReferencedDataFile(); ref != nil {
- dvIndex[*ref] = append(dvIndex[*ref], del.DataFile())
- }
+ dvIndex, err := buildDVIndex(entries.dvEntries)
+ if err != nil {
+ return nil, err
}
results := make([]FileScanTask, 0, len(entries.dataEntries))
@@ -587,7 +616,7 @@ func (scan *Scan) PlanFiles(ctx context.Context)
([]FileScanTask, error) {
File: e.DataFile(),
DeleteFiles: deleteFiles,
EqualityDeleteFiles: eqDeleteFiles,
- DeletionVectorFiles: dvIndex[e.DataFile().FilePath()],
+ DeletionVectorFiles: matchDVToData(e, dvIndex),
Start: 0,
Length: e.DataFile().FileSizeBytes(),
}