This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new fd089164 fix(reader): Support both position and equality delete files
on the same FileScanTask (#1778)
fd089164 is described below
commit fd089164bff19fdd4b0d297363fc7808b6f6ea18
Author: Matt Butrovich <[email protected]>
AuthorDate: Tue Nov 4 20:41:29 2025 -0500
fix(reader): Support both position and equality delete files on the same
FileScanTask (#1778)
## What issue does this PR close?
Partially address #1749.
## Rationale for this change
This PR fixes a bug in delete file loading when a `FileScanTask`
contains both positional and equality delete files. We hit this when
running Iceberg Java test suite via Comet in
https://github.com/apache/datafusion-comet/pull/2528. The tests that
failed were
```
TestSparkExecutorCache > testMergeOnReadUpdate()
TestSparkExecutorCache > testMergeOnReadMerge()
TestSparkExecutorCache > testMergeOnReadDelete()
```
**The Bug:**
The condition in `try_start_eq_del_load` (delete_filter.rs:71-73) was
inverted. It returned `None` when the equality delete file was not in
the cache, causing the loader to skip loading it. When
`build_equality_delete_predicate` was later called, it would fail with
"Missing predicate for equality delete file".
## What changes are included in this PR?
**The Fix:**
- Inverted the condition so it returns `None` when the file is already
in the cache (being loaded or loaded), preventing duplicate work across
concurrent tasks
- When the file is not in the cache, mark it as Loading and proceed with
loading
**Additional Changes:**
- Added test case `test_load_deletes_with_mixed_types` that reproduces
the bug scenario
## Are these changes tested?
Yes, this PR includes a new unit test
`test_load_deletes_with_mixed_types` that:
- Creates a `FileScanTask` with both a positional delete file and an
equality delete file
- Verifies that `load_deletes` successfully processes both types
- Verifies that `build_equality_delete_predicate` succeeds without the
"Missing predicate" error
- We hit this when running Iceberg Java test suite via Comet in
https://github.com/apache/datafusion-comet/pull/2528. I also confirmed
that it fixes the tests in Iceberg Java's suite.
The test would fail before this fix and passes after.
---
.../src/arrow/caching_delete_file_loader.rs | 113 +++++++++++++++++++++
crates/iceberg/src/arrow/delete_filter.rs | 4 +-
2 files changed, 116 insertions(+), 1 deletion(-)
diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index 078635c9..8a3ab3a9 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -788,4 +788,117 @@ mod tests {
assert_eq!(data_col.value(1), "d");
assert_eq!(data_col.value(2), "g");
}
+
+ /// Test loading a FileScanTask with BOTH positional and equality deletes.
+ /// Verifies the fix for the inverted condition that caused "Missing
predicate for equality delete file" errors.
+ #[tokio::test]
+ async fn test_load_deletes_with_mixed_types() {
+ use crate::scan::FileScanTask;
+ use crate::spec::{DataFileFormat, Schema};
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path();
+ let file_io =
FileIO::from_path(table_location.as_os_str().to_str().unwrap())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ // Create the data file schema
+ let data_file_schema = Arc::new(
+ Schema::builder()
+ .with_fields(vec![
+ crate::spec::NestedField::optional(
+ 2,
+ "y",
+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
+ )
+ .into(),
+ crate::spec::NestedField::optional(
+ 3,
+ "z",
+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ // Write positional delete file
+ let positional_delete_schema =
crate::arrow::delete_filter::tests::create_pos_del_schema();
+ let file_path_values =
+ vec![format!("{}/data-1.parquet",
table_location.to_str().unwrap()); 4];
+ let file_path_col =
Arc::new(StringArray::from_iter_values(&file_path_values));
+ let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2,
3]));
+
+ let positional_deletes_to_write =
+ RecordBatch::try_new(positional_delete_schema.clone(), vec![
+ file_path_col,
+ pos_col,
+ ])
+ .unwrap();
+
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ let pos_del_path = format!("{}/pos-del-mixed.parquet",
table_location.to_str().unwrap());
+ let file = File::create(&pos_del_path).unwrap();
+ let mut writer = ArrowWriter::try_new(
+ file,
+ positional_deletes_to_write.schema(),
+ Some(props.clone()),
+ )
+ .unwrap();
+ writer.write(&positional_deletes_to_write).unwrap();
+ writer.close().unwrap();
+
+ // Write equality delete file
+ let eq_delete_path =
setup_write_equality_delete_file_1(table_location.to_str().unwrap());
+
+ // Create FileScanTask with BOTH positional and equality deletes
+ let pos_del = FileScanTaskDeleteFile {
+ file_path: pos_del_path,
+ file_type: DataContentType::PositionDeletes,
+ partition_spec_id: 0,
+ equality_ids: None,
+ };
+
+ let eq_del = FileScanTaskDeleteFile {
+ file_path: eq_delete_path.clone(),
+ file_type: DataContentType::EqualityDeletes,
+ partition_spec_id: 0,
+ equality_ids: Some(vec![2, 3]), // Only use field IDs that exist
in both schemas
+ };
+
+ let file_scan_task = FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{}/data-1.parquet",
table_location.to_str().unwrap()),
+ data_file_format: DataFileFormat::Parquet,
+ schema: data_file_schema.clone(),
+ project_field_ids: vec![2, 3],
+ predicate: None,
+ deletes: vec![pos_del, eq_del],
+ };
+
+ // Load the deletes - should handle both types without error
+ let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(),
10);
+ let delete_filter = delete_file_loader
+ .load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref())
+ .await
+ .unwrap()
+ .unwrap();
+
+ // Verify both delete types can be processed together
+ let result = delete_filter
+ .build_equality_delete_predicate(&file_scan_task)
+ .await;
+ assert!(
+ result.is_ok(),
+ "Failed to build equality delete predicate: {:?}",
+ result.err()
+ );
+ }
}
diff --git a/crates/iceberg/src/arrow/delete_filter.rs
b/crates/iceberg/src/arrow/delete_filter.rs
index b853baa9..4250974b 100644
--- a/crates/iceberg/src/arrow/delete_filter.rs
+++ b/crates/iceberg/src/arrow/delete_filter.rs
@@ -68,10 +68,12 @@ impl DeleteFilter {
pub(crate) fn try_start_eq_del_load(&self, file_path: &str) ->
Option<Arc<Notify>> {
let mut state = self.state.write().unwrap();
- if !state.equality_deletes.contains_key(file_path) {
+ // Skip if already loaded/loading - another task owns it
+ if state.equality_deletes.contains_key(file_path) {
return None;
}
+ // Mark as loading to prevent duplicate work
let notifier = Arc::new(Notify::new());
state
.equality_deletes