alamb opened a new issue, #16717:
URL: https://github.com/apache/datafusion/issues/16717

   ### Is your feature request related to a problem or challenge?
   
   Most of DataFuson is carefully designed to operate on ~ target-batch size 
partitions (e.g. 8k rows) at a time
   
   However some data sources may produce much larger batches. For example, 
[`MemTable`](https://docs.rs/datafusion/latest/datafusion/catalog/struct.MemTable.html)
 provides the RecordBatches it was constructed with -- no additional splitting 
is done
   
   In some cases, such as the one @wegamekinglc reported this in 
https://github.com/apache/datafusion/issues/16707 , this means DataFusion can 
not effectively take advantage of all the cores available
   
   In that case, the datafusion-python bindings create a MemTable like this:
   * 
https://github.com/apache/datafusion-python/blob/2e1b71369eefc97c22b82be84bbabb414f748fb9/src/context.rs#L542-L572
   * Which then calls 
https://github.com/apache/datafusion-python/blob/2e1b71369eefc97c22b82be84bbabb414f748fb9/src/context.rs#L473
   
   You can see this with the following reproducer. 
   * A single large record batch completes in `1.161840792s`
   * The same rows with 10k per batch completes in `86.385875ms` (approximately 
in 1/16 of the time as my laptop has 16 cores)
   
   The plan is properly prepared to repartition the input from DataSourceExec 
into smaller batches (via RepartitionExec) but since there is only a single 
batch no repartitioning happens 
   
   ```
   
+---------------+-----------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                       |
   
+---------------+-----------------------------------------------------------------------------------+
   | logical_plan  | Projection: t.name, avg(t.value) AS avg_value              
                       |
   |               |   Aggregate: groupBy=[[t.name]], aggr=[[avg(CAST(t.value 
AS Float64))]]           |
   |               |     TableScan: t projection=[name, value]                  
                       |
   | physical_plan | ProjectionExec: expr=[name@0 as name, avg(t.value)@1 as 
avg_value]                |
   |               |   AggregateExec: mode=FinalPartitioned, gby=[name@0 as 
name], aggr=[avg(t.value)] |
   |               |     CoalesceBatchesExec: target_batch_size=8192            
                       |
   |               |       RepartitionExec: partitioning=Hash([name@0], 16), 
input_partitions=16       |
   |               |         AggregateExec: mode=Partial, gby=[name@0 as name], 
aggr=[avg(t.value)]    |
   |               |           RepartitionExec: 
partitioning=RoundRobinBatch(16), input_partitions=1   |
   |               |             DataSourceExec: partitions=1, 
partition_sizes=[1]                     |
   |               |                                                            
                       |
   
+---------------+-----------------------------------------------------------------------------------+
   ```
   
   To reproduce
   
   ```shell
   cargo run --release
   ```
   
   <details><summary>Code</summary>
   <p>
   
   ```rust
   use arrow::array::ArrayRef;
   use std::sync::{Arc, LazyLock};
   use arrow::array::Int32Array;
   use arrow::array::{Array, StringViewBuilder};
   use arrow::record_batch::RecordBatch;
   use datafusion::catalog::TableProvider;
   use datafusion::datasource::MemTable;
   use datafusion::prelude::*;
   use rand::Rng;
   
   #[tokio::main]
   async fn main() {
       let sql = "select name, avg(value) as avg_value from t group by name";
   
       // with one large table
       let ctx = SessionContext::new();
       ctx.register_table("t", one_large_table()).unwrap();
       println!("Registered one_large_table");
       let start = std::time::Instant::now();
       ctx.sql(sql).await.unwrap().explain(false, 
false).unwrap().show().await.unwrap();
       ctx.sql(sql).await.unwrap().show().await.unwrap();
       println!("Query executed in: {:?}", start.elapsed());
   
       // register the same table with many small batches
       let ctx = SessionContext::new();
       ctx.register_table("t", many_small_batches()).unwrap();
       println!("Registered many_small_batches");
       let start = std::time::Instant::now();
       ctx.sql(sql).await.unwrap().show().await.unwrap();
       println!("Query executed in: {:?}", start.elapsed());
   
   }
   
   // Creates a table with 100M rows split into 1 batch of 100M rows.
   fn one_large_table() -> Arc<dyn TableProvider> {
       let batch = BATCH.clone();
       Arc::new(MemTable::try_new(batch.schema(), vec![vec![batch]]).unwrap())
   }
   
   // Creates a table with 100M rows split into 10,000 batches of 10K rows each.
   fn many_small_batches() -> Arc<dyn TableProvider> {
       let batch_size = 10_000;
       let mut batch = BATCH.clone();
       let mut batches = vec![];
       loop {
           let num_to_take = std::cmp::min(batch_size, batch.num_rows());
           let small_batch = batch.slice(0, num_to_take);
           batches.push(small_batch);
           if batch.num_rows() > num_to_take {
               batch = batch.slice(num_to_take, batch.num_rows() - num_to_take);
           } else {
               break;
           }
       }
       Arc::new(MemTable::try_new(batch.schema(), vec![batches]).unwrap())
   }
   
   static BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
       let num_rows = 100_000_000;
       let names = "abcdefghijklmnopqrstuvwxyz";
   
       let mut builder = StringViewBuilder::new();
       for i in 0..num_rows {
           let idx = i % names.len();
           builder.append_value(&names[idx..idx+1]);
       }
       let names_array = builder.finish();
   
       let mut rng = rand::rng();
       let values_array = (0..names_array.len())
           .map(|_| rng.random_range(0..100))
           .collect::<Int32Array>();
   
       RecordBatch::try_from_iter(vec![
           ("name", Arc::new(names_array) as ArrayRef),
           ("value", Arc::new(values_array) as ArrayRef),
       ]).unwrap()
   });
   
   ```
   
   </p>
   </details> 
   
   ### Describe the solution you'd like
   
   I would like to use more cores, even when the user passes a single large 
record batch -- 
   
   
   ### Describe alternatives you've considered
   
   Here are some ideas from https://github.com/apache/datafusion/issues/16707:
   
   I suggested
   > So one thing we could potentially do here is make RepartitionExec smarter 
for large batches, and if the input is really large split it into smaller batch 
sizes or something 🤔
   
   
   @ozankabak  suggested fixing the `DataSourceExec`: 
   > Isn't the easiest solution is to update DataSourceExec to adhere to target 
batch size?
   
   
   I would tend to agree that if we can make DataSourceExec split up large 
batches that might be the best / most general solution here
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to