alamb opened a new issue, #16717: URL: https://github.com/apache/datafusion/issues/16717
### Is your feature request related to a problem or challenge? Most of DataFuson is carefully designed to operate on ~ target-batch size partitions (e.g. 8k rows) at a time However some data sources may produce much larger batches. For example, [`MemTable`](https://docs.rs/datafusion/latest/datafusion/catalog/struct.MemTable.html) provides the RecordBatches it was constructed with -- no additional splitting is done In some cases, such as the one @wegamekinglc reported this in https://github.com/apache/datafusion/issues/16707 , this means DataFusion can not effectively take advantage of all the cores available In that case, the datafusion-python bindings create a MemTable like this: * https://github.com/apache/datafusion-python/blob/2e1b71369eefc97c22b82be84bbabb414f748fb9/src/context.rs#L542-L572 * Which then calls https://github.com/apache/datafusion-python/blob/2e1b71369eefc97c22b82be84bbabb414f748fb9/src/context.rs#L473 You can see this with the following reproducer. * A single large record batch completes in `1.161840792s` * The same rows with 10k per batch completes in `86.385875ms` (approximately in 1/16 of the time as my laptop has 16 cores) The plan is properly prepared to repartition the input from DataSourceExec into smaller batches (via RepartitionExec) but since there is only a single batch no repartitioning happens ``` +---------------+-----------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-----------------------------------------------------------------------------------+ | logical_plan | Projection: t.name, avg(t.value) AS avg_value | | | Aggregate: groupBy=[[t.name]], aggr=[[avg(CAST(t.value AS Float64))]] | | | TableScan: t projection=[name, value] | | physical_plan | ProjectionExec: expr=[name@0 as name, avg(t.value)@1 as avg_value] | | | AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[avg(t.value)] | | | CoalesceBatchesExec: target_batch_size=8192 | | | RepartitionExec: partitioning=Hash([name@0], 16), input_partitions=16 | | | AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[avg(t.value)] | | | RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 | | | DataSourceExec: partitions=1, partition_sizes=[1] | | | | +---------------+-----------------------------------------------------------------------------------+ ``` To reproduce ```shell cargo run --release ``` <details><summary>Code</summary> <p> ```rust use arrow::array::ArrayRef; use std::sync::{Arc, LazyLock}; use arrow::array::Int32Array; use arrow::array::{Array, StringViewBuilder}; use arrow::record_batch::RecordBatch; use datafusion::catalog::TableProvider; use datafusion::datasource::MemTable; use datafusion::prelude::*; use rand::Rng; #[tokio::main] async fn main() { let sql = "select name, avg(value) as avg_value from t group by name"; // with one large table let ctx = SessionContext::new(); ctx.register_table("t", one_large_table()).unwrap(); println!("Registered one_large_table"); let start = std::time::Instant::now(); ctx.sql(sql).await.unwrap().explain(false, false).unwrap().show().await.unwrap(); ctx.sql(sql).await.unwrap().show().await.unwrap(); println!("Query executed in: {:?}", start.elapsed()); // register the same table with many small batches let ctx = SessionContext::new(); ctx.register_table("t", many_small_batches()).unwrap(); println!("Registered many_small_batches"); let start = std::time::Instant::now(); ctx.sql(sql).await.unwrap().show().await.unwrap(); println!("Query executed in: {:?}", start.elapsed()); } // Creates a table with 100M rows split into 1 batch of 100M rows. fn one_large_table() -> Arc<dyn TableProvider> { let batch = BATCH.clone(); Arc::new(MemTable::try_new(batch.schema(), vec![vec![batch]]).unwrap()) } // Creates a table with 100M rows split into 10,000 batches of 10K rows each. fn many_small_batches() -> Arc<dyn TableProvider> { let batch_size = 10_000; let mut batch = BATCH.clone(); let mut batches = vec![]; loop { let num_to_take = std::cmp::min(batch_size, batch.num_rows()); let small_batch = batch.slice(0, num_to_take); batches.push(small_batch); if batch.num_rows() > num_to_take { batch = batch.slice(num_to_take, batch.num_rows() - num_to_take); } else { break; } } Arc::new(MemTable::try_new(batch.schema(), vec![batches]).unwrap()) } static BATCH: LazyLock<RecordBatch> = LazyLock::new(|| { let num_rows = 100_000_000; let names = "abcdefghijklmnopqrstuvwxyz"; let mut builder = StringViewBuilder::new(); for i in 0..num_rows { let idx = i % names.len(); builder.append_value(&names[idx..idx+1]); } let names_array = builder.finish(); let mut rng = rand::rng(); let values_array = (0..names_array.len()) .map(|_| rng.random_range(0..100)) .collect::<Int32Array>(); RecordBatch::try_from_iter(vec![ ("name", Arc::new(names_array) as ArrayRef), ("value", Arc::new(values_array) as ArrayRef), ]).unwrap() }); ``` </p> </details> ### Describe the solution you'd like I would like to use more cores, even when the user passes a single large record batch -- ### Describe alternatives you've considered Here are some ideas from https://github.com/apache/datafusion/issues/16707: I suggested > So one thing we could potentially do here is make RepartitionExec smarter for large batches, and if the input is really large split it into smaller batch sizes or something 🤔 @ozankabak suggested fixing the `DataSourceExec`: > Isn't the easiest solution is to update DataSourceExec to adhere to target batch size? I would tend to agree that if we can make DataSourceExec split up large batches that might be the best / most general solution here ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org