This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new ab1ce32e feat(table): add DV planning validations to match Java's 
DeleteFileIndex (#1050) (#1108)
ab1ce32e is described below

commit ab1ce32e712d0d34b7c8dea0fcb255eee8ddf253
Author: Tanmay Rauth <[email protected]>
AuthorDate: Fri May 22 18:16:51 2026 +0530

    feat(table): add DV planning validations to match Java's DeleteFileIndex 
(#1050) (#1108)
    
    Add sequence-number guard and reject multiple DVs per data file during
    scan planning, matching Java's DeleteFileIndex behavior.
    
    Closes: #1050
---
 table/dv_scan_planning_test.go | 138 +++++++++++++++++++++++++++++++----------
 table/scanner.go               |  43 ++++++++++---
 2 files changed, 140 insertions(+), 41 deletions(-)

diff --git a/table/dv_scan_planning_test.go b/table/dv_scan_planning_test.go
index 93278369..7e04910c 100644
--- a/table/dv_scan_planning_test.go
+++ b/table/dv_scan_planning_test.go
@@ -164,43 +164,35 @@ func TestDVMatchingToDataFiles(t *testing.T) {
        }
 
        snapshotID := int64(1)
+       seqNum := int64(1)
        dvEntries := []iceberg.ManifestEntry{
-               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, 
nil, nil, dvForData1),
-               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, 
nil, nil, dvForData2),
+               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, 
&seqNum, nil, dvForData1),
+               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, 
&seqNum, nil, dvForData2),
        }
 
-       // Match DVs against data-001 — should only get dv-001
-       var matched []iceberg.DataFile
-       for _, del := range dvEntries {
-               if del.DataFile().ReferencedDataFile() == nil {
-                       continue
-               }
-
-               if *del.DataFile().ReferencedDataFile() == dataFilePath {
-                       matched = append(matched, del.DataFile())
-               }
-       }
+       dvIndex, err := buildDVIndex(dvEntries)
+       assert.NoError(t, err)
+       assert.Len(t, dvIndex, 2)
 
+       // Match DVs against data-001 — should only get dv-001
+       dataEntry1 := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, 
&snapshotID, &seqNum, nil,
+               &mockDataFile{path: dataFilePath, contentType: 
iceberg.EntryContentData})
+       matched := matchDVToData(dataEntry1, dvIndex)
        assert.Len(t, matched, 1)
        assert.Equal(t, dvForData1.path, matched[0].FilePath())
 
        // Match DVs against data-002 — should only get dv-002
-       var matched2 []iceberg.DataFile
-       for _, del := range dvEntries {
-               if del.DataFile().ReferencedDataFile() == nil {
-                       continue
-               }
-
-               if *del.DataFile().ReferencedDataFile() == otherDataFilePath {
-                       matched2 = append(matched2, del.DataFile())
-               }
-       }
-
+       dataEntry2 := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, 
&snapshotID, &seqNum, nil,
+               &mockDataFile{path: otherDataFilePath, contentType: 
iceberg.EntryContentData})
+       matched2 := matchDVToData(dataEntry2, dvIndex)
        assert.Len(t, matched2, 1)
        assert.Equal(t, dvForData2.path, matched2[0].FilePath())
 }
 
 func TestDVMatchingNoMatch(t *testing.T) {
+       snapshotID := int64(1)
+       seqNum := int64(1)
+
        dvFile := &dvMockDataFile{
                mockDataFile: mockDataFile{
                        path:        "s3://bucket/data/dv-001.puffin",
@@ -211,23 +203,101 @@ func TestDVMatchingNoMatch(t *testing.T) {
                contentSizeInBytes: int64Ptr(128),
        }
 
+       dvEntries := []iceberg.ManifestEntry{
+               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, 
&seqNum, nil, dvFile),
+       }
+
+       dvIndex, err := buildDVIndex(dvEntries)
+       assert.NoError(t, err)
+
+       dataEntry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, 
&snapshotID, &seqNum, nil,
+               &mockDataFile{path: "s3://bucket/data/data-001.parquet", 
contentType: iceberg.EntryContentData})
+       matched := matchDVToData(dataEntry, dvIndex)
+       assert.Empty(t, matched)
+}
+
+func TestBuildDVIndex_RejectsMultipleDVsPerDataFile(t *testing.T) {
+       dataFilePath := "s3://bucket/data/data-001.parquet"
+       snapshotID := int64(1)
+       seqNum := int64(1)
+
+       dv1 := &dvMockDataFile{
+               mockDataFile: mockDataFile{
+                       path:        "s3://bucket/data/dv-001.puffin",
+                       contentType: iceberg.EntryContentPosDeletes,
+               },
+               referencedDataFile: strPtr(dataFilePath),
+               contentOffset:      int64Ptr(0),
+               contentSizeInBytes: int64Ptr(128),
+       }
+
+       dv2 := &dvMockDataFile{
+               mockDataFile: mockDataFile{
+                       path:        "s3://bucket/data/dv-002.puffin",
+                       contentType: iceberg.EntryContentPosDeletes,
+               },
+               referencedDataFile: strPtr(dataFilePath),
+               contentOffset:      int64Ptr(256),
+               contentSizeInBytes: int64Ptr(64),
+       }
+
+       dvEntries := []iceberg.ManifestEntry{
+               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, 
&seqNum, nil, dv1),
+               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, 
&seqNum, nil, dv2),
+       }
+
+       _, err := buildDVIndex(dvEntries)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "can't index multiple deletion vectors")
+       assert.Contains(t, err.Error(), dataFilePath)
+}
+
+func TestMatchDVToData_SequenceNumberGuard(t *testing.T) {
+       dataFilePath := "s3://bucket/data/data-001.parquet"
        snapshotID := int64(1)
+
+       dvFile := &dvMockDataFile{
+               mockDataFile: mockDataFile{
+                       path:        "s3://bucket/data/dv-001.puffin",
+                       contentType: iceberg.EntryContentPosDeletes,
+               },
+               referencedDataFile: strPtr(dataFilePath),
+               contentOffset:      int64Ptr(0),
+               contentSizeInBytes: int64Ptr(128),
+       }
+
+       dvSeqNum := int64(5)
        dvEntries := []iceberg.ManifestEntry{
-               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, 
nil, nil, dvFile),
+               iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, 
&dvSeqNum, nil, dvFile),
        }
 
-       var matched []iceberg.DataFile
-       for _, del := range dvEntries {
-               if del.DataFile().ReferencedDataFile() == nil {
-                       continue
-               }
+       dvIndex, err := buildDVIndex(dvEntries)
+       assert.NoError(t, err)
 
-               if *del.DataFile().ReferencedDataFile() == 
"s3://bucket/data/data-001.parquet" {
-                       matched = append(matched, del.DataFile())
-               }
+       tests := []struct {
+               name       string
+               dataSeqNum int64
+               expectDV   bool
+       }{
+               {"data seq < DV seq — DV applies", 3, true},
+               {"data seq == DV seq — DV applies", 5, true},
+               {"data seq > DV seq — DV skipped", 7, false},
        }
 
-       assert.Empty(t, matched)
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       seqNum := tt.dataSeqNum
+                       dataEntry := 
iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, &seqNum, nil,
+                               &mockDataFile{path: dataFilePath, contentType: 
iceberg.EntryContentData})
+                       matched := matchDVToData(dataEntry, dvIndex)
+                       if tt.expectDV {
+                               assert.Len(t, matched, 1)
+                               assert.Equal(t, dvFile.path, 
matched[0].FilePath())
+                       } else {
+                               assert.Empty(t, matched)
+                       }
+               })
+       }
 }
 
 func TestFileScanTask_DeletionVectorFilesField(t *testing.T) {
diff --git a/table/scanner.go b/table/scanner.go
index b5a9b15d..3e9c6e0e 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -429,6 +429,38 @@ func partitionsMatch(a, b map[int]any) bool {
        return true
 }
 
+// buildDVIndex indexes deletion vectors by the data file path they reference.
+// The spec requires at most one DV per data file; a second entry for the same
+// path is rejected with an error.
+func buildDVIndex(dvEntries []iceberg.ManifestEntry) 
(map[string]iceberg.ManifestEntry, error) {
+       dvIndex := make(map[string]iceberg.ManifestEntry, len(dvEntries))
+       for _, del := range dvEntries {
+               if ref := del.DataFile().ReferencedDataFile(); ref != nil {
+                       if _, exists := dvIndex[*ref]; exists {
+                               return nil, fmt.Errorf("can't index multiple 
deletion vectors for %s", *ref)
+                       }
+                       dvIndex[*ref] = del
+               }
+       }
+
+       return dvIndex, nil
+}
+
+// matchDVToData returns the deletion vector that applies to the given data
+// entry, if any. A DV applies only when the data file's sequence number is
+// less than or equal to the DV's sequence number.
+func matchDVToData(dataEntry iceberg.ManifestEntry, dvIndex 
map[string]iceberg.ManifestEntry) []iceberg.DataFile {
+       dvEntry, ok := dvIndex[dataEntry.DataFile().FilePath()]
+       if !ok {
+               return nil
+       }
+       if dataEntry.SequenceNum() <= dvEntry.SequenceNum() {
+               return []iceberg.DataFile{dvEntry.DataFile()}
+       }
+
+       return nil
+}
+
 // fetchPartitionSpecFilteredManifests retrieves the table's current snapshot,
 // fetches its manifest files, and applies partition-spec filters to remove 
irrelevant manifests.
 func (scan *Scan) fetchPartitionSpecFilteredManifests(ctx context.Context) 
([]iceberg.ManifestFile, error) {
@@ -567,12 +599,9 @@ func (scan *Scan) PlanFiles(ctx context.Context) 
([]FileScanTask, error) {
                return cmp.Compare(a.SequenceNum(), b.SequenceNum())
        })
 
-       // Index DVs by referenced data file path for O(1) lookup.
-       dvIndex := make(map[string][]iceberg.DataFile, len(entries.dvEntries))
-       for _, del := range entries.dvEntries {
-               if ref := del.DataFile().ReferencedDataFile(); ref != nil {
-                       dvIndex[*ref] = append(dvIndex[*ref], del.DataFile())
-               }
+       dvIndex, err := buildDVIndex(entries.dvEntries)
+       if err != nil {
+               return nil, err
        }
 
        results := make([]FileScanTask, 0, len(entries.dataEntries))
@@ -587,7 +616,7 @@ func (scan *Scan) PlanFiles(ctx context.Context) 
([]FileScanTask, error) {
                        File:                e.DataFile(),
                        DeleteFiles:         deleteFiles,
                        EqualityDeleteFiles: eqDeleteFiles,
-                       DeletionVectorFiles: dvIndex[e.DataFile().FilePath()],
+                       DeletionVectorFiles: matchDVToData(e, dvIndex),
                        Start:               0,
                        Length:              e.DataFile().FileSizeBytes(),
                }

Reply via email to