This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ee21563c2 perf(reader): Fast path ArrowReader::read when concurrency
is 1 to avoid waker churn and add determinism to FileScanTask processing (#2020)
ee21563c2 is described below
commit ee21563c2032948f636eae84870f317a0b299a05
Author: Matt Butrovich <[email protected]>
AuthorDate: Mon Jan 19 20:27:32 2026 -0500
perf(reader): Fast path ArrowReader::read when concurrency is 1 to avoid
waker churn and add determinism to FileScanTask processing (#2020)
## Which issue does this PR close?
- N/A.
## What changes are included in this PR?
- Due to the way Comet maps DataFusion `SessionContext`, the tokio
runtime, and Spark Tasks, we see frequent waker churn when concurrency
is set to 1 in the `ArrowReader`. This adds a fast path that does not
use `try_flatten_unordered` and its internal `replace_waker` calls.
- This also prevents tasks from being reordered at runtime. Several
Iceberg Java tests expect specific query results without an `ORDER BY`,
so this enables those tests to keep working when concurrency is set to
1.
See https://github.com/apache/datafusion-comet/pull/3051 and
<img width="3804" height="754" alt="flamegraph"
src="https://github.com/user-attachments/assets/26b93e85-5835-4bf4-b7f1-b136face940d"
/>
## Are these changes tested?
New test for determinism, also running the entire Iceberg Java Spark
suite via Comet in https://github.com/apache/datafusion-comet/pull/3051.
---------
Co-authored-by: Renjie Liu <[email protected]>
---
crates/iceberg/src/arrow/reader.rs | 225 +++++++++++++++++++++++++++++++++----
1 file changed, 206 insertions(+), 19 deletions(-)
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index f7f90663a..aa45a1297 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -148,26 +148,53 @@ impl ArrowReader {
let row_group_filtering_enabled = self.row_group_filtering_enabled;
let row_selection_enabled = self.row_selection_enabled;
- let stream = tasks
- .map_ok(move |task| {
- let file_io = file_io.clone();
-
- Self::process_file_scan_task(
- task,
- batch_size,
- file_io,
- self.delete_file_loader.clone(),
- row_group_filtering_enabled,
- row_selection_enabled,
- )
- })
- .map_err(|err| {
- Error::new(ErrorKind::Unexpected, "file scan task generate
failed").with_source(err)
- })
- .try_buffer_unordered(concurrency_limit_data_files)
- .try_flatten_unordered(concurrency_limit_data_files);
+ // Fast-path for single concurrency to avoid overhead of
try_flatten_unordered
+ let stream: ArrowRecordBatchStream = if concurrency_limit_data_files
== 1 {
+ Box::pin(
+ tasks
+ .and_then(move |task| {
+ let file_io = file_io.clone();
+
+ Self::process_file_scan_task(
+ task,
+ batch_size,
+ file_io,
+ self.delete_file_loader.clone(),
+ row_group_filtering_enabled,
+ row_selection_enabled,
+ )
+ })
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "file scan task
generate failed")
+ .with_source(err)
+ })
+ .try_flatten(),
+ )
+ } else {
+ Box::pin(
+ tasks
+ .map_ok(move |task| {
+ let file_io = file_io.clone();
+
+ Self::process_file_scan_task(
+ task,
+ batch_size,
+ file_io,
+ self.delete_file_loader.clone(),
+ row_group_filtering_enabled,
+ row_selection_enabled,
+ )
+ })
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "file scan task
generate failed")
+ .with_source(err)
+ })
+ .try_buffer_unordered(concurrency_limit_data_files)
+ .try_flatten_unordered(concurrency_limit_data_files),
+ )
+ };
- Ok(Box::pin(stream) as ArrowRecordBatchStream)
+ Ok(stream)
}
#[allow(clippy::too_many_arguments)]
@@ -3894,6 +3921,166 @@ message schema {
assert!(result.is_empty() || result.iter().all(|batch|
batch.num_rows() == 0));
}
+ /// Test that concurrency=1 reads all files correctly and in deterministic
order.
+ /// This verifies the fast-path optimization for single concurrency.
+ #[tokio::test]
+ async fn test_read_with_concurrency_one() {
+ use arrow_array::Int32Array;
+
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "file_num",
Type::Primitive(PrimitiveType::Int))
+ .into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ Field::new("file_num", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "2".to_string(),
+ )])),
+ ]));
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_io =
FileIO::from_path(&table_location).unwrap().build().unwrap();
+
+ // Create 3 parquet files with different data
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ for file_num in 0..3 {
+ let id_data = Arc::new(Int32Array::from_iter_values(
+ file_num * 10..(file_num + 1) * 10,
+ )) as ArrayRef;
+ let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10]))
as ArrayRef;
+
+ let to_write =
+ RecordBatch::try_new(arrow_schema.clone(), vec![id_data,
file_num_data]).unwrap();
+
+ let file =
File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file, to_write.schema(),
Some(props.clone())).unwrap();
+ writer.write(&to_write).expect("Writing batch");
+ writer.close().unwrap();
+ }
+
+ // Read with concurrency=1 (fast-path)
+ let reader = ArrowReaderBuilder::new(file_io)
+ .with_data_file_concurrency_limit(1)
+ .build();
+
+ // Create tasks in a specific order: file_0, file_1, file_2
+ let tasks = vec![
+ Ok(FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{table_location}/file_0.parquet"),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1, 2],
+ predicate: None,
+ deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
+ case_sensitive: false,
+ }),
+ Ok(FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{table_location}/file_1.parquet"),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1, 2],
+ predicate: None,
+ deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
+ case_sensitive: false,
+ }),
+ Ok(FileScanTask {
+ start: 0,
+ length: 0,
+ record_count: None,
+ data_file_path: format!("{table_location}/file_2.parquet"),
+ data_file_format: DataFileFormat::Parquet,
+ schema: schema.clone(),
+ project_field_ids: vec![1, 2],
+ predicate: None,
+ deletes: vec![],
+ partition: None,
+ partition_spec: None,
+ name_mapping: None,
+ case_sensitive: false,
+ }),
+ ];
+
+ let tasks_stream = Box::pin(futures::stream::iter(tasks)) as
FileScanTaskStream;
+
+ let result = reader
+ .read(tasks_stream)
+ .unwrap()
+ .try_collect::<Vec<RecordBatch>>()
+ .await
+ .unwrap();
+
+ // Verify we got all 30 rows (10 from each file)
+ let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 30, "Should have 30 total rows");
+
+ // Collect all ids and file_nums to verify data
+ let mut all_ids = Vec::new();
+ let mut all_file_nums = Vec::new();
+
+ for batch in &result {
+ let id_col = batch
+ .column(0)
+ .as_primitive::<arrow_array::types::Int32Type>();
+ let file_num_col = batch
+ .column(1)
+ .as_primitive::<arrow_array::types::Int32Type>();
+
+ for i in 0..batch.num_rows() {
+ all_ids.push(id_col.value(i));
+ all_file_nums.push(file_num_col.value(i));
+ }
+ }
+
+ assert_eq!(all_ids.len(), 30);
+ assert_eq!(all_file_nums.len(), 30);
+
+ // With concurrency=1 and sequential processing, files should be
processed in order
+ // file_0: ids 0-9, file_num=0
+ // file_1: ids 10-19, file_num=1
+ // file_2: ids 20-29, file_num=2
+ for i in 0..10 {
+ assert_eq!(all_file_nums[i], 0, "First 10 rows should be from
file_0");
+ assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
+ }
+ for i in 10..20 {
+ assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from
file_1");
+ assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
+ }
+ for i in 20..30 {
+ assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from
file_2");
+ assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
+ }
+ }
+
/// Test bucket partitioning reads source column from data file (not
partition metadata).
///
/// This is an integration test verifying the complete ArrowReader
pipeline with bucket partitioning.