jizezhang commented on issue #18782:
URL: https://github.com/apache/datafusion/issues/18782#issuecomment-3568216125

   Hi @alamb, I wonder if I may confirm the behavior of one test 
https://github.com/apache/datafusion/blob/fc77be94570e3ada7e28db8c5412125f54e0b96d/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs#L267
 with you wrt integrating with `BatchCoalescer`. 
   
   I noticed that this test runs two versions of the same query, one with 
Dataframe API and one with `SessionContext::sql`. The logical plans resulting 
from the two are slightly different, in particular the `projection` part:
   - With DataFrame API
     ```
     [2025-11-22T21:29:47Z DEBUG datafusion_optimizer::utils] Final optimized 
plan:
       Aggregate: groupBy=[[]], aggr=[[count(?table?.flag)]]
         TableScan: ?table? projection=[flag], full_filters=[?table?.flag = 
Int32(0)]
     ```
   - With `sql`,
     ```
     [2025-11-22T21:31:15Z DEBUG datafusion_optimizer::utils] Final optimized 
plan:
       Projection: count(Int64(1)) AS count(*)
         Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
           TableScan: data projection=[], full_filters=[data.flag = Int32(0)]
     ```
   The `SessionContext::sql` version had an issue when using arrow 
`BatchCoalescer` kernel via `LimitedBatchCoalescer`.  The reason (I think) is 
that the custom table provider `CustomProvider` in the test has a branching 
logic on schema depending on `projection` 
https://github.com/apache/datafusion/blob/fc77be94570e3ada7e28db8c5412125f54e0b96d/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs#L208-L210
   thus the resulting physical plans differ in `schema`:
   - With DataFrame API,
     ```
     AggregateExec: mode=Final, gby=[], aggr=[count(?table?.flag)], 
schema=[count(?table?.flag):Int64]
     CoalescePartitionsExec, schema=[count(?table?.flag)[count]:Int64]
       AggregateExec: mode=Partial, gby=[], aggr=[count(?table?.flag)], 
schema=[count(?table?.flag)[count]:Int64]
         RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, 
schema=[flag:Int32]
           CooperativeExec, schema=[flag:Int32]
             CustomPlan: batch_size=1, schema=[flag:Int32]
     ```
   - With `sql`,
     ```
     ProjectionExec: expr=[count(Int64(1))@0 as count(*)], 
schema=[count(*):Int64]
     AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))], 
schema=[count(Int64(1)):Int64]
       CoalescePartitionsExec, schema=[count(Int64(1))[count]:Int64]
         AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))], 
schema=[count(Int64(1))[count]:Int64]
           RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1, schema=[]
             CooperativeExec, schema=[]
               CustomPlan: batch_size=1, schema=[]
     ```
   However, the batches returned by the associated custom execution plan 
`CustomPlan` is always full (not projected) 
https://github.com/apache/datafusion/blob/fc77be94570e3ada7e28db8c5412125f54e0b96d/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs#L139
   
   Arrow `BatchCoalescer` checks that the provided schema to the coalescer 
matches actual batch schema 
https://github.com/apache/arrow-rs/blob/a67d49758b1faee7d42fe3b215e226d6d560f237/arrow-select/src/coalesce.rs#L428,
 thus the `SessionContext::sql` version would panic.
   
   The issue went away when I modified `CustomPlan::execute` to return 
projected batches, but I wanted to check whether the behavior is expected and 
whether the test should be updated, or otherwise what approaches to take. 
Thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to