This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new d33f3bb7 fix: global eq delete matching should apply to only strictly 
older files, and fix partition scoped matching to consider spec id (#1758)
d33f3bb7 is described below

commit d33f3bb77ede1bf481bf71d9ddb45cb4cdcbd858
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Mon Oct 20 10:48:51 2025 -0600

    fix: global eq delete matching should apply to only strictly older files, 
and fix partition scoped matching to consider spec id (#1758)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #1759.
    
    ## What changes are included in this PR?
    
    This changes scan planning of equality deletes so that:
    1. we only match against eq deletes which are strictly older than the
    data file. Right now it looks like we incorrectly over-apply based on
    the seq number for global equality deletes.
    
    2. Partition scoped deletes (both equality and position) are compared
    correctly by also factoring in the spec ID. It's not quite enough to
    compare just off the tuple, we should also compare based off the spec ID
    as well.
    
    ## Are these changes tested?
    
    Added unit tests which are scoped to testing delete index matching
    logic.
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
---
 crates/iceberg/src/delete_file_index.rs | 292 +++++++++++++++++++++++++++++++-
 1 file changed, 283 insertions(+), 9 deletions(-)

diff --git a/crates/iceberg/src/delete_file_index.rs 
b/crates/iceberg/src/delete_file_index.rs
index d8f7a872..4f6fd284 100644
--- a/crates/iceberg/src/delete_file_index.rs
+++ b/crates/iceberg/src/delete_file_index.rs
@@ -42,7 +42,7 @@ enum DeleteFileIndexState {
 #[derive(Debug)]
 struct PopulatedDeleteFileIndex {
     #[allow(dead_code)]
-    global_deletes: Vec<Arc<DeleteFileContext>>,
+    global_equality_deletes: Vec<Arc<DeleteFileContext>>,
     eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
     pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
     // TODO: do we need this?
@@ -65,7 +65,8 @@ impl DeleteFileIndex {
         spawn({
             let state = state.clone();
             async move {
-                let delete_files = 
delete_file_stream.collect::<Vec<_>>().await;
+                let delete_files: Vec<DeleteFileContext> =
+                    delete_file_stream.collect::<Vec<_>>().await;
 
                 let populated_delete_file_index = 
PopulatedDeleteFileIndex::new(delete_files);
 
@@ -114,7 +115,7 @@ impl PopulatedDeleteFileIndex {
     ///
     /// 1. The partition information is extracted from each delete file's 
manifest entry.
     /// 2. If the partition is empty and the delete file is not a positional 
delete,
-    ///    it is added to the `global_deletes` vector
+    ///    it is added to the `global_equality_deletes` vector
     /// 3. Otherwise, the delete file is added to one of two hash maps based 
on its content type.
     fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
         let mut eq_deletes_by_partition: HashMap<Struct, 
Vec<Arc<DeleteFileContext>>> =
@@ -122,7 +123,7 @@ impl PopulatedDeleteFileIndex {
         let mut pos_deletes_by_partition: HashMap<Struct, 
Vec<Arc<DeleteFileContext>>> =
             HashMap::default();
 
-        let mut global_deletes: Vec<Arc<DeleteFileContext>> = vec![];
+        let mut global_equality_deletes: Vec<Arc<DeleteFileContext>> = vec![];
 
         files.into_iter().for_each(|ctx| {
             let arc_ctx = Arc::new(ctx);
@@ -133,7 +134,7 @@ impl PopulatedDeleteFileIndex {
             if partition.fields().is_empty() {
                 // TODO: confirm we're good to skip here if we encounter a pos 
del
                 if arc_ctx.manifest_entry.content_type() != 
DataContentType::PositionDeletes {
-                    global_deletes.push(arc_ctx);
+                    global_equality_deletes.push(arc_ctx);
                     return;
                 }
             }
@@ -153,7 +154,7 @@ impl PopulatedDeleteFileIndex {
         });
 
         PopulatedDeleteFileIndex {
-            global_deletes,
+            global_equality_deletes,
             eq_deletes_by_partition,
             pos_deletes_by_partition,
         }
@@ -167,12 +168,12 @@ impl PopulatedDeleteFileIndex {
     ) -> Vec<FileScanTaskDeleteFile> {
         let mut results = vec![];
 
-        self.global_deletes
+        self.global_equality_deletes
             .iter()
-            // filter that returns true if the provided delete file's sequence 
number is **greater than or equal to** `seq_num`
+            // filter that returns true if the provided delete file's sequence 
number is **greater than** `seq_num`
             .filter(|&delete| {
                 seq_num
-                    .map(|seq_num| delete.manifest_entry.sequence_number() >= 
Some(seq_num))
+                    .map(|seq_num| delete.manifest_entry.sequence_number() > 
Some(seq_num))
                     .unwrap_or_else(|| true)
             })
             .for_each(|delete| results.push(delete.as_ref().into()));
@@ -185,6 +186,7 @@ impl PopulatedDeleteFileIndex {
                     seq_num
                         .map(|seq_num| delete.manifest_entry.sequence_number() 
> Some(seq_num))
                         .unwrap_or_else(|| true)
+                        && data_file.partition_spec_id == 
delete.partition_spec_id
                 })
                 .for_each(|delete| results.push(delete.as_ref().into()));
         }
@@ -201,6 +203,7 @@ impl PopulatedDeleteFileIndex {
                     seq_num
                         .map(|seq_num| delete.manifest_entry.sequence_number() 
>= Some(seq_num))
                         .unwrap_or_else(|| true)
+                        && data_file.partition_spec_id == 
delete.partition_spec_id
                 })
                 .for_each(|delete| results.push(delete.as_ref().into()));
         }
@@ -208,3 +211,274 @@ impl PopulatedDeleteFileIndex {
         results
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use uuid::Uuid;
+
+    use super::*;
+    use crate::spec::{
+        DataContentType, DataFileBuilder, DataFileFormat, Literal, 
ManifestEntry, ManifestStatus,
+        Struct,
+    };
+
+    #[test]
+    fn test_delete_file_index_unpartitioned() {
+        let deletes: Vec<ManifestEntry> = vec![
+            build_added_manifest_entry(4, &build_unpartitioned_eq_delete()),
+            build_added_manifest_entry(6, &build_unpartitioned_eq_delete()),
+            build_added_manifest_entry(5, &build_unpartitioned_pos_delete()),
+            build_added_manifest_entry(6, &build_unpartitioned_pos_delete()),
+        ];
+
+        let delete_file_paths: Vec<String> = deletes
+            .iter()
+            .map(|file| file.file_path().to_string())
+            .collect();
+
+        let delete_contexts: Vec<DeleteFileContext> = deletes
+            .into_iter()
+            .map(|entry| DeleteFileContext {
+                manifest_entry: entry.into(),
+                partition_spec_id: 0,
+            })
+            .collect();
+
+        let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
+
+        let data_file = build_unpartitioned_data_file();
+
+        // All deletes apply to sequence 0
+        let delete_files_to_apply_for_seq_0 =
+            delete_file_index.get_deletes_for_data_file(&data_file, Some(0));
+        assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
+
+        // All deletes apply to sequence 3
+        let delete_files_to_apply_for_seq_3 =
+            delete_file_index.get_deletes_for_data_file(&data_file, Some(3));
+        assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
+
+        // Last 3 deletes apply to sequence 4
+        let delete_files_to_apply_for_seq_4 =
+            delete_file_index.get_deletes_for_data_file(&data_file, Some(4));
+        let actual_paths_to_apply_for_seq_4: Vec<String> = 
delete_files_to_apply_for_seq_4
+            .into_iter()
+            .map(|file| file.file_path)
+            .collect();
+
+        assert_eq!(
+            actual_paths_to_apply_for_seq_4,
+            delete_file_paths[delete_file_paths.len() - 3..]
+        );
+
+        // Last 3 deletes apply to sequence 5
+        let delete_files_to_apply_for_seq_5 =
+            delete_file_index.get_deletes_for_data_file(&data_file, Some(5));
+        let actual_paths_to_apply_for_seq_5: Vec<String> = 
delete_files_to_apply_for_seq_5
+            .into_iter()
+            .map(|file| file.file_path)
+            .collect();
+        assert_eq!(
+            actual_paths_to_apply_for_seq_5,
+            delete_file_paths[delete_file_paths.len() - 3..]
+        );
+
+        // Only the last position delete applies to sequence 6
+        let delete_files_to_apply_for_seq_6 =
+            delete_file_index.get_deletes_for_data_file(&data_file, Some(6));
+        let actual_paths_to_apply_for_seq_6: Vec<String> = 
delete_files_to_apply_for_seq_6
+            .into_iter()
+            .map(|file| file.file_path)
+            .collect();
+        assert_eq!(
+            actual_paths_to_apply_for_seq_6,
+            delete_file_paths[delete_file_paths.len() - 1..]
+        );
+
+        // The 2 global equality deletes should match against any partitioned 
file
+        let partitioned_file =
+            
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), 1);
+
+        let delete_files_to_apply_for_partitioned_file =
+            delete_file_index.get_deletes_for_data_file(&partitioned_file, 
Some(0));
+        let actual_paths_to_apply_for_partitioned_file: Vec<String> =
+            delete_files_to_apply_for_partitioned_file
+                .into_iter()
+                .map(|file| file.file_path)
+                .collect();
+        assert_eq!(
+            actual_paths_to_apply_for_partitioned_file,
+            delete_file_paths[..2]
+        );
+    }
+
+    #[test]
+    fn test_delete_file_index_partitioned() {
+        let partition_one = Struct::from_iter([Some(Literal::long(100))]);
+        let spec_id = 1;
+        let deletes: Vec<ManifestEntry> = vec![
+            build_added_manifest_entry(4, 
&build_partitioned_eq_delete(&partition_one, spec_id)),
+            build_added_manifest_entry(6, 
&build_partitioned_eq_delete(&partition_one, spec_id)),
+            build_added_manifest_entry(5, 
&build_partitioned_pos_delete(&partition_one, spec_id)),
+            build_added_manifest_entry(6, 
&build_partitioned_pos_delete(&partition_one, spec_id)),
+        ];
+
+        let delete_file_paths: Vec<String> = deletes
+            .iter()
+            .map(|file| file.file_path().to_string())
+            .collect();
+
+        let delete_contexts: Vec<DeleteFileContext> = deletes
+            .into_iter()
+            .map(|entry| DeleteFileContext {
+                manifest_entry: entry.into(),
+                partition_spec_id: spec_id,
+            })
+            .collect();
+
+        let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
+
+        let partitioned_file =
+            
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), 
spec_id);
+
+        // All deletes apply to sequence 0
+        let delete_files_to_apply_for_seq_0 =
+            delete_file_index.get_deletes_for_data_file(&partitioned_file, 
Some(0));
+        assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
+
+        // All deletes apply to sequence 3
+        let delete_files_to_apply_for_seq_3 =
+            delete_file_index.get_deletes_for_data_file(&partitioned_file, 
Some(3));
+        assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
+
+        // Last 3 deletes apply to sequence 4
+        let delete_files_to_apply_for_seq_4 =
+            delete_file_index.get_deletes_for_data_file(&partitioned_file, 
Some(4));
+        let actual_paths_to_apply_for_seq_4: Vec<String> = 
delete_files_to_apply_for_seq_4
+            .into_iter()
+            .map(|file| file.file_path)
+            .collect();
+
+        assert_eq!(
+            actual_paths_to_apply_for_seq_4,
+            delete_file_paths[delete_file_paths.len() - 3..]
+        );
+
+        // Last 3 deletes apply to sequence 5
+        let delete_files_to_apply_for_seq_5 =
+            delete_file_index.get_deletes_for_data_file(&partitioned_file, 
Some(5));
+        let actual_paths_to_apply_for_seq_5: Vec<String> = 
delete_files_to_apply_for_seq_5
+            .into_iter()
+            .map(|file| file.file_path)
+            .collect();
+        assert_eq!(
+            actual_paths_to_apply_for_seq_5,
+            delete_file_paths[delete_file_paths.len() - 3..]
+        );
+
+        // Only the last position delete applies to sequence 6
+        let delete_files_to_apply_for_seq_6 =
+            delete_file_index.get_deletes_for_data_file(&partitioned_file, 
Some(6));
+        let actual_paths_to_apply_for_seq_6: Vec<String> = 
delete_files_to_apply_for_seq_6
+            .into_iter()
+            .map(|file| file.file_path)
+            .collect();
+        assert_eq!(
+            actual_paths_to_apply_for_seq_6,
+            delete_file_paths[delete_file_paths.len() - 1..]
+        );
+
+        // Data file with different partition tuples does not match any delete 
files
+        let partitioned_second_file =
+            
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(200))]), 1);
+        let delete_files_to_apply_for_different_partition =
+            
delete_file_index.get_deletes_for_data_file(&partitioned_second_file, Some(0));
+        let actual_paths_to_apply_for_different_partition: Vec<String> =
+            delete_files_to_apply_for_different_partition
+                .into_iter()
+                .map(|file| file.file_path)
+                .collect();
+        assert!(actual_paths_to_apply_for_different_partition.is_empty());
+
+        // Data file with same tuple but different spec ID does not match any 
delete files
+        let partitioned_different_spec = 
build_partitioned_data_file(&partition_one, 2);
+        let delete_files_to_apply_for_different_spec =
+            
delete_file_index.get_deletes_for_data_file(&partitioned_different_spec, 
Some(0));
+        let actual_paths_to_apply_for_different_spec: Vec<String> =
+            delete_files_to_apply_for_different_spec
+                .into_iter()
+                .map(|file| file.file_path)
+                .collect();
+        assert!(actual_paths_to_apply_for_different_spec.is_empty());
+    }
+
+    fn build_unpartitioned_eq_delete() -> DataFile {
+        build_partitioned_eq_delete(&Struct::empty(), 0)
+    }
+
+    fn build_partitioned_eq_delete(partition: &Struct, spec_id: i32) -> 
DataFile {
+        DataFileBuilder::default()
+            .file_path(format!("{}_equality_delete.parquet", Uuid::new_v4()))
+            .file_format(DataFileFormat::Parquet)
+            .content(DataContentType::EqualityDeletes)
+            .equality_ids(Some(vec![1]))
+            .record_count(1)
+            .partition(partition.clone())
+            .partition_spec_id(spec_id)
+            .file_size_in_bytes(100)
+            .build()
+            .unwrap()
+    }
+
+    fn build_unpartitioned_pos_delete() -> DataFile {
+        build_partitioned_pos_delete(&Struct::empty(), 0)
+    }
+
+    fn build_partitioned_pos_delete(partition: &Struct, spec_id: i32) -> 
DataFile {
+        DataFileBuilder::default()
+            .file_path(format!("{}-pos-delete.parquet", Uuid::new_v4()))
+            .file_format(DataFileFormat::Parquet)
+            .content(DataContentType::PositionDeletes)
+            .record_count(1)
+            .referenced_data_file(Some("/some-data-file.parquet".to_string()))
+            .partition(partition.clone())
+            .partition_spec_id(spec_id)
+            .file_size_in_bytes(100)
+            .build()
+            .unwrap()
+    }
+
+    fn build_unpartitioned_data_file() -> DataFile {
+        DataFileBuilder::default()
+            .file_path(format!("{}-data.parquet", Uuid::new_v4()))
+            .file_format(DataFileFormat::Parquet)
+            .content(DataContentType::Data)
+            .record_count(100)
+            .partition(Struct::empty())
+            .partition_spec_id(0)
+            .file_size_in_bytes(100)
+            .build()
+            .unwrap()
+    }
+
+    fn build_partitioned_data_file(partition_value: &Struct, spec_id: i32) -> 
DataFile {
+        DataFileBuilder::default()
+            .file_path(format!("{}-data.parquet", Uuid::new_v4()))
+            .file_format(DataFileFormat::Parquet)
+            .content(DataContentType::Data)
+            .record_count(100)
+            .partition(partition_value.clone())
+            .partition_spec_id(spec_id)
+            .file_size_in_bytes(100)
+            .build()
+            .unwrap()
+    }
+
+    fn build_added_manifest_entry(data_seq_number: i64, file: &DataFile) -> 
ManifestEntry {
+        ManifestEntry::builder()
+            .status(ManifestStatus::Added)
+            .sequence_number(data_seq_number)
+            .data_file(file.clone())
+            .build()
+    }
+}

Reply via email to