haohuaijin opened a new issue, #19398:
URL: https://github.com/apache/datafusion/issues/19398

   ### Describe the bug
   
   current in datafusion we have projection pushdown optimizer rule before the 
the optimizer rule that add `CoalesceBatchesExec` and `CooperativeExec`, it 
work fine.
   
https://github.com/apache/datafusion/blob/d493f3d441aa30e7e83d4ff918e07681f985a966/datafusion/physical-optimizer/src/optimizer.rs#L122-L151
   
   but in another case, if you want to do projection down again for the 
phsycial plan that already go thoung one round physical optimzier rule, the 
projection pushdown will not able to go through for `CoalesceBatchesExec` and 
`CooperativeExec`
   
   ### To Reproduce
   
   ```rust
   use std::sync::Arc;
   
   use arrow::{
       array::{ArrayRef, Int32Array, RecordBatch},
       datatypes::{DataType, Field, Schema},
   };
   use datafusion::{
       common::config::ConfigOptions,
       datasource::MemTable,
       physical_expr::PhysicalExpr,
       physical_optimizer::{
           PhysicalOptimizerRule, projection_pushdown::ProjectionPushdown,
       },
       physical_plan::{
           ExecutionPlan, coalesce_batches::CoalesceBatchesExec, displayable,
           expressions::Column, projection::ProjectionExec,
       },
       prelude::{SessionContext, col, lit},
   };
   
   #[tokio::main]
   async fn main() -> Result<(), Box<dyn std::error::Error>> {
       let schema = Arc::new(Schema::new(vec![
           Field::new("a", DataType::Int32, false),
           Field::new("b", DataType::Int32, false),
           Field::new("c", DataType::Int32, false),
           Field::new("d", DataType::Int32, false),
       ]));
       let mut partitions = vec![];
       for _ in 0..4 {
           let batch = RecordBatch::try_new(
               schema.clone(),
               vec![
                   Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
                   Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef,
                   Arc::new(Int32Array::from(vec![7, 8, 9])) as ArrayRef,
                   Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef,
               ],
           )?;
           partitions.push(vec![batch]);
       }
       let table = MemTable::try_new(schema.clone(), partitions)?;
       let ctx = SessionContext::new();
       ctx.register_table("test", Arc::new(table))?;
   
       // physical plan after optimizer rules are applied
       let physical_plan = ctx
           .table("test")
           .await?
           .select_columns(&["a", "b", "c"])?
           .filter(col("c").gt(lit(0)))?
           .create_physical_plan()
           .await?;
       println!(
           "{}",
           displayable(physical_plan.as_ref())
               .set_show_schema(true)
               .indent(true)
       );
   
       // add projection and coalesce batches exec, then apply projection 
pushdown
       let physical_plan = Arc::new(ProjectionExec::try_new(
           vec![
               (
                   Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>,
                   "a".to_string(),
               ),
               (
                   Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>,
                   "b".to_string(),
               ),
           ],
           Arc::new(CoalesceBatchesExec::new(physical_plan.clone(), 8192))
               as Arc<dyn ExecutionPlan>,
       )?);
       let config = ConfigOptions::new();
       let projection_pushdown = ProjectionPushdown::new();
       let optimized = projection_pushdown.optimize(physical_plan.clone(), 
&config)?;
       // projection pushdown is not applied
       println!(
           "{}",
           displayable(optimized.as_ref())
               .set_show_schema(true)
               .indent(true)
       );
   
       Ok(())
   }
   ```
   result
   ```
   FilterExec: c@2 > 0, schema=[a:Int32, b:Int32, c:Int32]
     DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], 
schema=[a:Int32, b:Int32, c:Int32]
   
   ProjectionExec: expr=[a@0 as a, b@1 as b], schema=[a:Int32, b:Int32]
     CoalesceBatchesExec: target_batch_size=8192, schema=[a:Int32, b:Int32, 
c:Int32]
       FilterExec: c@2 > 0, schema=[a:Int32, b:Int32, c:Int32]
         DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], 
schema=[a:Int32, b:Int32, c:Int32]
   ```
   
   ### Expected behavior
   
   projection pushdown can go though `CoalesceBatchesExec` and 
`CooperativeExec` 
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to