haohuaijin opened a new issue, #19398: URL: https://github.com/apache/datafusion/issues/19398
### Describe the bug current in datafusion we have projection pushdown optimizer rule before the the optimizer rule that add `CoalesceBatchesExec` and `CooperativeExec`, it work fine. https://github.com/apache/datafusion/blob/d493f3d441aa30e7e83d4ff918e07681f985a966/datafusion/physical-optimizer/src/optimizer.rs#L122-L151 but in another case, if you want to do projection down again for the phsycial plan that already go thoung one round physical optimzier rule, the projection pushdown will not able to go through for `CoalesceBatchesExec` and `CooperativeExec` ### To Reproduce ```rust use std::sync::Arc; use arrow::{ array::{ArrayRef, Int32Array, RecordBatch}, datatypes::{DataType, Field, Schema}, }; use datafusion::{ common::config::ConfigOptions, datasource::MemTable, physical_expr::PhysicalExpr, physical_optimizer::{ PhysicalOptimizerRule, projection_pushdown::ProjectionPushdown, }, physical_plan::{ ExecutionPlan, coalesce_batches::CoalesceBatchesExec, displayable, expressions::Column, projection::ProjectionExec, }, prelude::{SessionContext, col, lit}, }; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false), Field::new("c", DataType::Int32, false), Field::new("d", DataType::Int32, false), ])); let mut partitions = vec![]; for _ in 0..4 { let batch = RecordBatch::try_new( schema.clone(), vec![ Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef, Arc::new(Int32Array::from(vec![7, 8, 9])) as ArrayRef, Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef, ], )?; partitions.push(vec![batch]); } let table = MemTable::try_new(schema.clone(), partitions)?; let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(table))?; // physical plan after optimizer rules are applied let physical_plan = ctx .table("test") .await? .select_columns(&["a", "b", "c"])? .filter(col("c").gt(lit(0)))? .create_physical_plan() .await?; println!( "{}", displayable(physical_plan.as_ref()) .set_show_schema(true) .indent(true) ); // add projection and coalesce batches exec, then apply projection pushdown let physical_plan = Arc::new(ProjectionExec::try_new( vec![ ( Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>, "a".to_string(), ), ( Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>, "b".to_string(), ), ], Arc::new(CoalesceBatchesExec::new(physical_plan.clone(), 8192)) as Arc<dyn ExecutionPlan>, )?); let config = ConfigOptions::new(); let projection_pushdown = ProjectionPushdown::new(); let optimized = projection_pushdown.optimize(physical_plan.clone(), &config)?; // projection pushdown is not applied println!( "{}", displayable(optimized.as_ref()) .set_show_schema(true) .indent(true) ); Ok(()) } ``` result ``` FilterExec: c@2 > 0, schema=[a:Int32, b:Int32, c:Int32] DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], schema=[a:Int32, b:Int32, c:Int32] ProjectionExec: expr=[a@0 as a, b@1 as b], schema=[a:Int32, b:Int32] CoalesceBatchesExec: target_batch_size=8192, schema=[a:Int32, b:Int32, c:Int32] FilterExec: c@2 > 0, schema=[a:Int32, b:Int32, c:Int32] DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1], schema=[a:Int32, b:Int32, c:Int32] ``` ### Expected behavior projection pushdown can go though `CoalesceBatchesExec` and `CooperativeExec` ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
