This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new ee21563c2 perf(reader): Fast path ArrowReader::read when concurrency 
is 1 to avoid waker churn and add determinism to FileScanTask processing (#2020)
ee21563c2 is described below

commit ee21563c2032948f636eae84870f317a0b299a05
Author: Matt Butrovich <[email protected]>
AuthorDate: Mon Jan 19 20:27:32 2026 -0500

    perf(reader): Fast path ArrowReader::read when concurrency is 1 to avoid 
waker churn and add determinism to FileScanTask processing (#2020)
    
    ## Which issue does this PR close?
    
    - N/A.
    
    ## What changes are included in this PR?
    
    - Due to the way Comet maps DataFusion `SessionContext`, the tokio
    runtime, and Spark Tasks, we see frequent waker churn when concurrency
    is set to 1 in the `ArrowReader`. This adds a fast path that does not
    use `try_flatten_unordered` and its internal `replace_waker` calls.
    - This also prevents tasks from being reordered at runtime. Several
    Iceberg Java tests expect specific query results without an `ORDER BY`,
    so this enables those tests to keep working when concurrency is set to
    1.
    
    See https://github.com/apache/datafusion-comet/pull/3051 and
    
    <img width="3804" height="754" alt="flamegraph"
    
src="https://github.com/user-attachments/assets/26b93e85-5835-4bf4-b7f1-b136face940d";
    />
    
    ## Are these changes tested?
    
    
    New test for determinism, also running the entire Iceberg Java Spark
    suite via Comet in https://github.com/apache/datafusion-comet/pull/3051.
    
    ---------
    
    Co-authored-by: Renjie Liu <[email protected]>
---
 crates/iceberg/src/arrow/reader.rs | 225 +++++++++++++++++++++++++++++++++----
 1 file changed, 206 insertions(+), 19 deletions(-)

diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index f7f90663a..aa45a1297 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -148,26 +148,53 @@ impl ArrowReader {
         let row_group_filtering_enabled = self.row_group_filtering_enabled;
         let row_selection_enabled = self.row_selection_enabled;
 
-        let stream = tasks
-            .map_ok(move |task| {
-                let file_io = file_io.clone();
-
-                Self::process_file_scan_task(
-                    task,
-                    batch_size,
-                    file_io,
-                    self.delete_file_loader.clone(),
-                    row_group_filtering_enabled,
-                    row_selection_enabled,
-                )
-            })
-            .map_err(|err| {
-                Error::new(ErrorKind::Unexpected, "file scan task generate 
failed").with_source(err)
-            })
-            .try_buffer_unordered(concurrency_limit_data_files)
-            .try_flatten_unordered(concurrency_limit_data_files);
+        // Fast-path for single concurrency to avoid overhead of 
try_flatten_unordered
+        let stream: ArrowRecordBatchStream = if concurrency_limit_data_files 
== 1 {
+            Box::pin(
+                tasks
+                    .and_then(move |task| {
+                        let file_io = file_io.clone();
+
+                        Self::process_file_scan_task(
+                            task,
+                            batch_size,
+                            file_io,
+                            self.delete_file_loader.clone(),
+                            row_group_filtering_enabled,
+                            row_selection_enabled,
+                        )
+                    })
+                    .map_err(|err| {
+                        Error::new(ErrorKind::Unexpected, "file scan task 
generate failed")
+                            .with_source(err)
+                    })
+                    .try_flatten(),
+            )
+        } else {
+            Box::pin(
+                tasks
+                    .map_ok(move |task| {
+                        let file_io = file_io.clone();
+
+                        Self::process_file_scan_task(
+                            task,
+                            batch_size,
+                            file_io,
+                            self.delete_file_loader.clone(),
+                            row_group_filtering_enabled,
+                            row_selection_enabled,
+                        )
+                    })
+                    .map_err(|err| {
+                        Error::new(ErrorKind::Unexpected, "file scan task 
generate failed")
+                            .with_source(err)
+                    })
+                    .try_buffer_unordered(concurrency_limit_data_files)
+                    .try_flatten_unordered(concurrency_limit_data_files),
+            )
+        };
 
-        Ok(Box::pin(stream) as ArrowRecordBatchStream)
+        Ok(stream)
     }
 
     #[allow(clippy::too_many_arguments)]
@@ -3894,6 +3921,166 @@ message schema {
         assert!(result.is_empty() || result.iter().all(|batch| 
batch.num_rows() == 0));
     }
 
+    /// Test that concurrency=1 reads all files correctly and in deterministic 
order.
+    /// This verifies the fast-path optimization for single concurrency.
+    #[tokio::test]
+    async fn test_read_with_concurrency_one() {
+        use arrow_array::Int32Array;
+
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::required(2, "file_num", 
Type::Primitive(PrimitiveType::Int))
+                        .into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+            Field::new("file_num", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "2".to_string(),
+            )])),
+        ]));
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_io = 
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+        // Create 3 parquet files with different data
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        for file_num in 0..3 {
+            let id_data = Arc::new(Int32Array::from_iter_values(
+                file_num * 10..(file_num + 1) * 10,
+            )) as ArrayRef;
+            let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) 
as ArrayRef;
+
+            let to_write =
+                RecordBatch::try_new(arrow_schema.clone(), vec![id_data, 
file_num_data]).unwrap();
+
+            let file = 
File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
+            let mut writer =
+                ArrowWriter::try_new(file, to_write.schema(), 
Some(props.clone())).unwrap();
+            writer.write(&to_write).expect("Writing batch");
+            writer.close().unwrap();
+        }
+
+        // Read with concurrency=1 (fast-path)
+        let reader = ArrowReaderBuilder::new(file_io)
+            .with_data_file_concurrency_limit(1)
+            .build();
+
+        // Create tasks in a specific order: file_0, file_1, file_2
+        let tasks = vec![
+            Ok(FileScanTask {
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: format!("{table_location}/file_0.parquet"),
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),
+                project_field_ids: vec![1, 2],
+                predicate: None,
+                deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
+                case_sensitive: false,
+            }),
+            Ok(FileScanTask {
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: format!("{table_location}/file_1.parquet"),
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),
+                project_field_ids: vec![1, 2],
+                predicate: None,
+                deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
+                case_sensitive: false,
+            }),
+            Ok(FileScanTask {
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: format!("{table_location}/file_2.parquet"),
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),
+                project_field_ids: vec![1, 2],
+                predicate: None,
+                deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
+                case_sensitive: false,
+            }),
+        ];
+
+        let tasks_stream = Box::pin(futures::stream::iter(tasks)) as 
FileScanTaskStream;
+
+        let result = reader
+            .read(tasks_stream)
+            .unwrap()
+            .try_collect::<Vec<RecordBatch>>()
+            .await
+            .unwrap();
+
+        // Verify we got all 30 rows (10 from each file)
+        let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 30, "Should have 30 total rows");
+
+        // Collect all ids and file_nums to verify data
+        let mut all_ids = Vec::new();
+        let mut all_file_nums = Vec::new();
+
+        for batch in &result {
+            let id_col = batch
+                .column(0)
+                .as_primitive::<arrow_array::types::Int32Type>();
+            let file_num_col = batch
+                .column(1)
+                .as_primitive::<arrow_array::types::Int32Type>();
+
+            for i in 0..batch.num_rows() {
+                all_ids.push(id_col.value(i));
+                all_file_nums.push(file_num_col.value(i));
+            }
+        }
+
+        assert_eq!(all_ids.len(), 30);
+        assert_eq!(all_file_nums.len(), 30);
+
+        // With concurrency=1 and sequential processing, files should be 
processed in order
+        // file_0: ids 0-9, file_num=0
+        // file_1: ids 10-19, file_num=1
+        // file_2: ids 20-29, file_num=2
+        for i in 0..10 {
+            assert_eq!(all_file_nums[i], 0, "First 10 rows should be from 
file_0");
+            assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
+        }
+        for i in 10..20 {
+            assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from 
file_1");
+            assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
+        }
+        for i in 20..30 {
+            assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from 
file_2");
+            assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
+        }
+    }
+
     /// Test bucket partitioning reads source column from data file (not 
partition metadata).
     ///
     /// This is an integration test verifying the complete ArrowReader 
pipeline with bucket partitioning.

Reply via email to