This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bda65882 Add case-sensitive support for equality deletes in 
DeleteFilter (#1930)
6bda65882 is described below

commit 6bda658824f65c5f66494e6ae0d235fbe2edfe0c
Author: slfan1989 <[email protected]>
AuthorDate: Tue Dec 16 20:22:07 2025 +0800

    Add case-sensitive support for equality deletes in DeleteFilter (#1930)
---
 .../src/arrow/caching_delete_file_loader.rs        |  1 +
 crates/iceberg/src/arrow/delete_filter.rs          | 62 ++++++++++++++++++++--
 crates/iceberg/src/arrow/reader.rs                 | 15 ++++++
 crates/iceberg/src/scan/context.rs                 |  5 ++
 crates/iceberg/src/scan/mod.rs                     |  2 +
 crates/iceberg/src/scan/task.rs                    |  3 ++
 6 files changed, 85 insertions(+), 3 deletions(-)

diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs 
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 250fc5e8d..aceeae49f 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -911,6 +911,7 @@ mod tests {
             partition: None,
             partition_spec: None,
             name_mapping: None,
+            case_sensitive: false,
         };
 
         // Load the deletes - should handle both types without error
diff --git a/crates/iceberg/src/arrow/delete_filter.rs 
b/crates/iceberg/src/arrow/delete_filter.rs
index 14b5124ee..d05e02899 100644
--- a/crates/iceberg/src/arrow/delete_filter.rs
+++ b/crates/iceberg/src/arrow/delete_filter.rs
@@ -141,8 +141,8 @@ impl DeleteFilter {
             return Ok(None);
         }
 
-        // TODO: handle case-insensitive case
-        let bound_predicate = 
combined_predicate.bind(file_scan_task.schema.clone(), false)?;
+        let bound_predicate = combined_predicate
+            .bind(file_scan_task.schema.clone(), 
file_scan_task.case_sensitive)?;
         Ok(Some(bound_predicate))
     }
 
@@ -211,8 +211,9 @@ pub(crate) mod tests {
 
     use super::*;
     use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
+    use crate::expr::Reference;
     use crate::io::FileIO;
-    use crate::spec::{DataFileFormat, Schema};
+    use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, 
Schema, Type};
 
     type ArrowSchemaRef = Arc<ArrowSchema>;
 
@@ -344,6 +345,7 @@ pub(crate) mod tests {
                 partition: None,
                 partition_spec: None,
                 name_mapping: None,
+                case_sensitive: false,
             },
             FileScanTask {
                 start: 0,
@@ -358,6 +360,7 @@ pub(crate) mod tests {
                 partition: None,
                 partition_spec: None,
                 name_mapping: None,
+                case_sensitive: false,
             },
         ];
 
@@ -380,4 +383,57 @@ pub(crate) mod tests {
         ];
         Arc::new(arrow_schema::Schema::new(fields))
     }
+
+    #[tokio::test]
+    async fn test_build_equality_delete_predicate_case_sensitive() {
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "Id", 
Type::Primitive(PrimitiveType::Long)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        // ---------- fake FileScanTask ----------
+        let task = FileScanTask {
+            start: 0,
+            length: 0,
+            record_count: None,
+            data_file_path: "data.parquet".to_string(),
+            data_file_format: crate::spec::DataFileFormat::Parquet,
+            schema: schema.clone(),
+            project_field_ids: vec![],
+            predicate: None,
+            deletes: vec![FileScanTaskDeleteFile {
+                file_path: "eq-del.parquet".to_string(),
+                file_type: DataContentType::EqualityDeletes,
+                partition_spec_id: 0,
+                equality_ids: None,
+            }],
+            partition: None,
+            partition_spec: None,
+            name_mapping: None,
+            case_sensitive: true,
+        };
+
+        let filter = DeleteFilter::default();
+
+        // ---------- insert equality delete predicate ----------
+        let pred = Reference::new("id").equal_to(Datum::long(10));
+
+        let (tx, rx) = tokio::sync::oneshot::channel();
+        filter.insert_equality_delete("eq-del.parquet", rx);
+
+        tx.send(pred).unwrap();
+
+        // ---------- should FAIL ----------
+        let result = filter.build_equality_delete_predicate(&task).await;
+
+        assert!(
+            result.is_err(),
+            "case_sensitive=true should fail when column case mismatches"
+        );
+    }
 }
diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index 6209c1e26..f7f90663a 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -2082,6 +2082,7 @@ message schema {
                 partition: None,
                 partition_spec: None,
                 name_mapping: None,
+                case_sensitive: false,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -2403,6 +2404,7 @@ message schema {
             partition: None,
             partition_spec: None,
             name_mapping: None,
+            case_sensitive: false,
         };
 
         // Task 2: read the second and third row groups
@@ -2419,6 +2421,7 @@ message schema {
             partition: None,
             partition_spec: None,
             name_mapping: None,
+            case_sensitive: false,
         };
 
         let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as 
FileScanTaskStream;
@@ -2546,6 +2549,7 @@ message schema {
                 partition: None,
                 partition_spec: None,
                 name_mapping: None,
+                case_sensitive: false,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -2717,6 +2721,7 @@ message schema {
             partition: None,
             partition_spec: None,
             name_mapping: None,
+            case_sensitive: false,
         };
 
         let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as 
FileScanTaskStream;
@@ -2934,6 +2939,7 @@ message schema {
             partition: None,
             partition_spec: None,
             name_mapping: None,
+            case_sensitive: false,
         };
 
         let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as 
FileScanTaskStream;
@@ -3144,6 +3150,7 @@ message schema {
             partition: None,
             partition_spec: None,
             name_mapping: None,
+            case_sensitive: false,
         };
 
         let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as 
FileScanTaskStream;
@@ -3247,6 +3254,7 @@ message schema {
                 partition: None,
                 partition_spec: None,
                 name_mapping: None,
+                case_sensitive: false,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3344,6 +3352,7 @@ message schema {
                 partition: None,
                 partition_spec: None,
                 name_mapping: None,
+                case_sensitive: false,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3430,6 +3439,7 @@ message schema {
                 partition: None,
                 partition_spec: None,
                 name_mapping: None,
+                case_sensitive: false,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3530,6 +3540,7 @@ message schema {
                 partition: None,
                 partition_spec: None,
                 name_mapping: None,
+                case_sensitive: false,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3659,6 +3670,7 @@ message schema {
                 partition: None,
                 partition_spec: None,
                 name_mapping: None,
+                case_sensitive: false,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3755,6 +3767,7 @@ message schema {
                 partition: None,
                 partition_spec: None,
                 name_mapping: None,
+                case_sensitive: false,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -3864,6 +3877,7 @@ message schema {
                 partition: None,
                 partition_spec: None,
                 name_mapping: None,
+                case_sensitive: false,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
@@ -4003,6 +4017,7 @@ message schema {
                 partition: Some(partition_data),
                 partition_spec: Some(partition_spec),
                 name_mapping: None,
+                case_sensitive: false,
             })]
             .into_iter(),
         )) as FileScanTaskStream;
diff --git a/crates/iceberg/src/scan/context.rs 
b/crates/iceberg/src/scan/context.rs
index f28b6b090..169d8e640 100644
--- a/crates/iceberg/src/scan/context.rs
+++ b/crates/iceberg/src/scan/context.rs
@@ -46,6 +46,7 @@ pub(crate) struct ManifestFileContext {
     snapshot_schema: SchemaRef,
     expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
     delete_file_index: DeleteFileIndex,
+    case_sensitive: bool,
 }
 
 /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -59,6 +60,7 @@ pub(crate) struct ManifestEntryContext {
     pub partition_spec_id: i32,
     pub snapshot_schema: SchemaRef,
     pub delete_file_index: DeleteFileIndex,
+    pub case_sensitive: bool,
 }
 
 impl ManifestFileContext {
@@ -89,6 +91,7 @@ impl ManifestFileContext {
                 bound_predicates: bound_predicates.clone(),
                 snapshot_schema: snapshot_schema.clone(),
                 delete_file_index: delete_file_index.clone(),
+                case_sensitive: self.case_sensitive,
             };
 
             sender
@@ -135,6 +138,7 @@ impl ManifestEntryContext {
             partition_spec: None,
             // TODO: Extract name_mapping from table metadata property 
"schema.name-mapping.default"
             name_mapping: None,
+            case_sensitive: self.case_sensitive,
         })
     }
 }
@@ -277,6 +281,7 @@ impl PlanContext {
             field_ids: self.field_ids.clone(),
             expression_evaluator_cache: 
self.expression_evaluator_cache.clone(),
             delete_file_index,
+            case_sensitive: self.case_sensitive,
         }
     }
 }
diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index 1f7fa50df..c055c12c9 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -1885,6 +1885,7 @@ pub mod tests {
             partition: None,
             partition_spec: None,
             name_mapping: None,
+            case_sensitive: false,
         };
         test_fn(task);
 
@@ -1902,6 +1903,7 @@ pub mod tests {
             partition: None,
             partition_spec: None,
             name_mapping: None,
+            case_sensitive: false,
         };
         test_fn(task);
     }
diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs
index e1ef241a5..5349a9bdd 100644
--- a/crates/iceberg/src/scan/task.rs
+++ b/crates/iceberg/src/scan/task.rs
@@ -104,6 +104,9 @@ pub struct FileScanTask {
     #[serde(serialize_with = "serialize_not_implemented")]
     #[serde(deserialize_with = "deserialize_not_implemented")]
     pub name_mapping: Option<Arc<NameMapping>>,
+
+    /// Whether this scan task should treat column names as case-sensitive 
when binding predicates.
+    pub case_sensitive: bool,
 }
 
 impl FileScanTask {

Reply via email to