This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 26a7606e0 fix: prioritize delete manifests to prevent scan deadlock 
(#1937)
26a7606e0 is described below

commit 26a7606e0f04d5175e452a809c7bdceb47ba2c07
Author: Lo <[email protected]>
AuthorDate: Mon Dec 15 18:30:10 2025 +0800

    fix: prioritize delete manifests to prevent scan deadlock (#1937)
    
    ## Which issue does this PR close?
    
    - Closes #.
    
    ## What changes are included in this PR?
    
    This change ensures that delete manifests are processed before data
    manifests during the table scan planning phase.
    Previously, if data manifests were processed first and produced enough
    entries to fill the channel, the producer would block. Since the delete
    manifest consumer might still be waiting for its entries (which hadn't
    been produced yet), this could lead to a deadlock. Prioritizing delete
    manifests ensures the delete consumer can proceed, allowing the data
    consumer to eventually drain the channel.
    
    ## Are these changes tested?
    
    Added a reproduction test case `test_scan_deadlock` to verify the fix.
---
 crates/iceberg/src/scan/context.rs |  12 +++-
 crates/iceberg/src/scan/mod.rs     | 125 +++++++++++++++++++++++++++++++++++++
 2 files changed, 136 insertions(+), 1 deletion(-)

diff --git a/crates/iceberg/src/scan/context.rs 
b/crates/iceberg/src/scan/context.rs
index fe3f5c8f7..f28b6b090 100644
--- a/crates/iceberg/src/scan/context.rs
+++ b/crates/iceberg/src/scan/context.rs
@@ -194,7 +194,17 @@ impl PlanContext {
         delete_file_idx: DeleteFileIndex,
         delete_file_tx: Sender<ManifestEntryContext>,
     ) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 
'static>> {
-        let manifest_files = manifest_list.entries().iter();
+        let mut manifest_files = 
manifest_list.entries().iter().collect::<Vec<_>>();
+        // Sort manifest files to process delete manifests first.
+        // This avoids a deadlock where the producer blocks on sending data 
manifest entries
+        // (because the data channel is full) while the delete manifest 
consumer is waiting
+        // for delete manifest entries (which haven't been produced yet).
+        // By processing delete manifests first, we ensure the delete consumer 
can finish,
+        // which then allows the data consumer to start draining the data 
channel.
+        manifest_files.sort_by_key(|m| match m.content {
+            ManifestContentType::Deletes => 0,
+            ManifestContentType::Data => 1,
+        });
 
         // TODO: Ideally we could ditch this intermediate Vec as we return an 
iterator.
         let mut filtered_mfcs = vec![];
diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs
index d83da8a87..1f7fa50df 100644
--- a/crates/iceberg/src/scan/mod.rs
+++ b/crates/iceberg/src/scan/mod.rs
@@ -1170,6 +1170,97 @@ pub mod tests {
                 writer.close().unwrap();
             }
         }
+
+        pub async fn setup_deadlock_manifests(&mut self) {
+            let current_snapshot = 
self.table.metadata().current_snapshot().unwrap();
+            let _parent_snapshot = current_snapshot
+                .parent_snapshot(self.table.metadata())
+                .unwrap();
+            let current_schema = 
current_snapshot.schema(self.table.metadata()).unwrap();
+            let current_partition_spec = 
self.table.metadata().default_partition_spec();
+
+            // 1. Write DATA manifest with MULTIPLE entries to fill buffer
+            let mut writer = ManifestWriterBuilder::new(
+                self.next_manifest_file(),
+                Some(current_snapshot.snapshot_id()),
+                None,
+                current_schema.clone(),
+                current_partition_spec.as_ref().clone(),
+            )
+            .build_v2_data();
+
+            // Add 10 data entries
+            for i in 0..10 {
+                writer
+                    .add_entry(
+                        ManifestEntry::builder()
+                            .status(ManifestStatus::Added)
+                            .data_file(
+                                DataFileBuilder::default()
+                                    .partition_spec_id(0)
+                                    .content(DataContentType::Data)
+                                    .file_path(format!("{}/{}.parquet", 
&self.table_location, i))
+                                    .file_format(DataFileFormat::Parquet)
+                                    .file_size_in_bytes(100)
+                                    .record_count(1)
+                                    
.partition(Struct::from_iter([Some(Literal::long(100))]))
+                                    .key_metadata(None)
+                                    .build()
+                                    .unwrap(),
+                            )
+                            .build(),
+                    )
+                    .unwrap();
+            }
+            let data_manifest = writer.write_manifest_file().await.unwrap();
+
+            // 2. Write DELETE manifest
+            let mut writer = ManifestWriterBuilder::new(
+                self.next_manifest_file(),
+                Some(current_snapshot.snapshot_id()),
+                None,
+                current_schema.clone(),
+                current_partition_spec.as_ref().clone(),
+            )
+            .build_v2_deletes();
+
+            writer
+                .add_entry(
+                    ManifestEntry::builder()
+                        .status(ManifestStatus::Added)
+                        .data_file(
+                            DataFileBuilder::default()
+                                .partition_spec_id(0)
+                                .content(DataContentType::PositionDeletes)
+                                .file_path(format!("{}/del.parquet", 
&self.table_location))
+                                .file_format(DataFileFormat::Parquet)
+                                .file_size_in_bytes(100)
+                                .record_count(1)
+                                
.partition(Struct::from_iter([Some(Literal::long(100))]))
+                                .build()
+                                .unwrap(),
+                        )
+                        .build(),
+                )
+                .unwrap();
+            let delete_manifest = writer.write_manifest_file().await.unwrap();
+
+            // Write to manifest list - DATA FIRST then DELETE
+            // This order is crucial for reproduction
+            let mut manifest_list_write = ManifestListWriter::v2(
+                self.table
+                    .file_io()
+                    .new_output(current_snapshot.manifest_list())
+                    .unwrap(),
+                current_snapshot.snapshot_id(),
+                current_snapshot.parent_snapshot_id(),
+                current_snapshot.sequence_number(),
+            );
+            manifest_list_write
+                .add_manifests(vec![data_manifest, 
delete_manifest].into_iter())
+                .unwrap();
+            manifest_list_write.close().await.unwrap();
+        }
     }
 
     #[test]
@@ -2127,4 +2218,38 @@ pub mod tests {
             "_file column (duplicate) should use RunEndEncoded type"
         );
     }
+
+    #[tokio::test]
+    async fn test_scan_deadlock() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_deadlock_manifests().await;
+
+        // Create table scan with concurrency limit 1
+        // This sets channel size to 1.
+        // Data manifest has 10 entries -> will block producer.
+        // Delete manifest is 2nd in list -> won't be processed.
+        // Consumer 2 (Data) not started -> blocked.
+        // Consumer 1 (Delete) waiting -> blocked.
+        let table_scan = fixture
+            .table
+            .scan()
+            .with_concurrency_limit(1)
+            .build()
+            .unwrap();
+
+        // This should timeout/hang if deadlock exists
+        // We can use tokio::time::timeout
+        let result = tokio::time::timeout(std::time::Duration::from_secs(5), 
async {
+            table_scan
+                .plan_files()
+                .await
+                .unwrap()
+                .try_collect::<Vec<_>>()
+                .await
+        })
+        .await;
+
+        // Assert it finished (didn't timeout)
+        assert!(result.is_ok(), "Scan timed out - deadlock detected");
+    }
 }

Reply via email to