This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 26a7606e0 fix: prioritize delete manifests to prevent scan deadlock
(#1937)
26a7606e0 is described below
commit 26a7606e0f04d5175e452a809c7bdceb47ba2c07
Author: Lo <[email protected]>
AuthorDate: Mon Dec 15 18:30:10 2025 +0800
fix: prioritize delete manifests to prevent scan deadlock (#1937)
## Which issue does this PR close?
- Closes #.
## What changes are included in this PR?
This change ensures that delete manifests are processed before data
manifests during the table scan planning phase.
Previously, if data manifests were processed first and produced enough
entries to fill the channel, the producer would block. Since the delete
manifest consumer might still be waiting for its entries (which hadn't
been produced yet), this could lead to a deadlock. Prioritizing delete
manifests ensures the delete consumer can proceed, allowing the data
consumer to eventually drain the channel.
## Are these changes tested?
Added a reproduction test case `test_scan_deadlock` to verify the fix.
---
crates/iceberg/src/scan/context.rs | 12 +++-
crates/iceberg/src/scan/mod.rs | 125 +++++++++++++++++++++++++++++++++++++
2 files changed, 136 insertions(+), 1 deletion(-)
diff --git a/crates/iceberg/src/scan/context.rs
b/crates/iceberg/src/scan/context.rs
index fe3f5c8f7..f28b6b090 100644
--- a/crates/iceberg/src/scan/context.rs
+++ b/crates/iceberg/src/scan/context.rs
@@ -194,7 +194,17 @@ impl PlanContext {
delete_file_idx: DeleteFileIndex,
delete_file_tx: Sender<ManifestEntryContext>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> +
'static>> {
- let manifest_files = manifest_list.entries().iter();
+ let mut manifest_files =
manifest_list.entries().iter().collect::<Vec<_>>();
+ // Sort manifest files to process delete manifests first.
+ // This avoids a deadlock where the producer blocks on sending data
manifest entries
+ // (because the data channel is full) while the delete manifest
consumer is waiting
+ // for delete manifest entries (which haven't been produced yet).
+ // By processing delete manifests first, we ensure the delete consumer
can finish,
+ // which then allows the data consumer to start draining the data
channel.
+ manifest_files.sort_by_key(|m| match m.content {
+ ManifestContentType::Deletes => 0,
+ ManifestContentType::Data => 1,
+ });
// TODO: Ideally we could ditch this intermediate Vec as we return an
iterator.
let mut filtered_mfcs = vec![];
diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index d83da8a87..1f7fa50df 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -1170,6 +1170,97 @@ pub mod tests {
writer.close().unwrap();
}
}
+
+ pub async fn setup_deadlock_manifests(&mut self) {
+ let current_snapshot =
self.table.metadata().current_snapshot().unwrap();
+ let _parent_snapshot = current_snapshot
+ .parent_snapshot(self.table.metadata())
+ .unwrap();
+ let current_schema =
current_snapshot.schema(self.table.metadata()).unwrap();
+ let current_partition_spec =
self.table.metadata().default_partition_spec();
+
+ // 1. Write DATA manifest with MULTIPLE entries to fill buffer
+ let mut writer = ManifestWriterBuilder::new(
+ self.next_manifest_file(),
+ Some(current_snapshot.snapshot_id()),
+ None,
+ current_schema.clone(),
+ current_partition_spec.as_ref().clone(),
+ )
+ .build_v2_data();
+
+ // Add 10 data entries
+ for i in 0..10 {
+ writer
+ .add_entry(
+ ManifestEntry::builder()
+ .status(ManifestStatus::Added)
+ .data_file(
+ DataFileBuilder::default()
+ .partition_spec_id(0)
+ .content(DataContentType::Data)
+ .file_path(format!("{}/{}.parquet",
&self.table_location, i))
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(1)
+
.partition(Struct::from_iter([Some(Literal::long(100))]))
+ .key_metadata(None)
+ .build()
+ .unwrap(),
+ )
+ .build(),
+ )
+ .unwrap();
+ }
+ let data_manifest = writer.write_manifest_file().await.unwrap();
+
+ // 2. Write DELETE manifest
+ let mut writer = ManifestWriterBuilder::new(
+ self.next_manifest_file(),
+ Some(current_snapshot.snapshot_id()),
+ None,
+ current_schema.clone(),
+ current_partition_spec.as_ref().clone(),
+ )
+ .build_v2_deletes();
+
+ writer
+ .add_entry(
+ ManifestEntry::builder()
+ .status(ManifestStatus::Added)
+ .data_file(
+ DataFileBuilder::default()
+ .partition_spec_id(0)
+ .content(DataContentType::PositionDeletes)
+ .file_path(format!("{}/del.parquet",
&self.table_location))
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(1)
+
.partition(Struct::from_iter([Some(Literal::long(100))]))
+ .build()
+ .unwrap(),
+ )
+ .build(),
+ )
+ .unwrap();
+ let delete_manifest = writer.write_manifest_file().await.unwrap();
+
+ // Write to manifest list - DATA FIRST then DELETE
+ // This order is crucial for reproduction
+ let mut manifest_list_write = ManifestListWriter::v2(
+ self.table
+ .file_io()
+ .new_output(current_snapshot.manifest_list())
+ .unwrap(),
+ current_snapshot.snapshot_id(),
+ current_snapshot.parent_snapshot_id(),
+ current_snapshot.sequence_number(),
+ );
+ manifest_list_write
+ .add_manifests(vec![data_manifest,
delete_manifest].into_iter())
+ .unwrap();
+ manifest_list_write.close().await.unwrap();
+ }
}
#[test]
@@ -2127,4 +2218,38 @@ pub mod tests {
"_file column (duplicate) should use RunEndEncoded type"
);
}
+
+ #[tokio::test]
+ async fn test_scan_deadlock() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_deadlock_manifests().await;
+
+ // Create table scan with concurrency limit 1
+ // This sets channel size to 1.
+ // Data manifest has 10 entries -> will block producer.
+ // Delete manifest is 2nd in list -> won't be processed.
+ // Consumer 2 (Data) not started -> blocked.
+ // Consumer 1 (Delete) waiting -> blocked.
+ let table_scan = fixture
+ .table
+ .scan()
+ .with_concurrency_limit(1)
+ .build()
+ .unwrap();
+
+ // This should timeout/hang if deadlock exists
+ // We can use tokio::time::timeout
+ let result = tokio::time::timeout(std::time::Duration::from_secs(5),
async {
+ table_scan
+ .plan_files()
+ .await
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ })
+ .await;
+
+ // Assert it finished (didn't timeout)
+ assert!(result.is_ok(), "Scan timed out - deadlock detected");
+ }
}