alamb opened a new pull request #1018:
URL: https://github.com/apache/arrow-datafusion/pull/1018


   # Which issue does this PR close?
   
   Finally 🎉  closes https://github.com/apache/arrow-datafusion/issues/866 
(following the same model as 
https://github.com/apache/arrow-datafusion/pull/1004). 
   
   There are still some operators like Parquet, CSV, Avro, and Json sources 
that are not instrumented, but I don't have time to devote to intrumenting them 
now, and will file a follow on ticket to track that work
   
   
    # Rationale for this change
   We want basic understanding of where a plan's time is spent and in what 
operators. See https://github.com/apache/arrow-datafusion/issues/866 for more 
details
   
   # What changes are included in this PR?
   1. Instrument `WindowAggExec` and `UnionExec`,  using the API from 
https://github.com/apache/arrow-datafusion/pull/909
   2. Tweak instrumentation for `CoalescePartitionsExec` so it reports 
`elapsed_compute`
   3. Tests for same
   
   # Are there any user-facing changes?
   More fields in `EXPLAIN ANALYZE` are now filled out
   
   Example of how explain analyze is looking (dense but packed with good info). 
I find it quite cool that DataFusion can even plan and execute such queries.
   
   ```sql
   running query: EXPLAIN ANALYZE SELECT count(*) as cnt FROM (SELECT count(*), 
c1 FROM aggregate_test_100 WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' GROUP 
BY c1 ORDER BY c1 ) UNION ALL SELECT 1 as cnt UNION ALL SELECT lead(c1, 1) OVER 
() as cnt FROM (select 1 as c1) LIMIT 2
   Query Output:
   
   
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type         | plan                                                   
                                                                                
                                                                                
                         |
   
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | Plan with Metrics | GlobalLimitExec: limit=2, metrics=[output_rows=2, 
elapsed_compute=3.023µs]                                                        
                                                                                
                              |
   |                   |   CoalescePartitionsExec, metrics=[output_rows=3, 
elapsed_compute=50.216µs]                                                       
                                                                                
                              |
   |                   |     LocalLimitExec: limit=2, metrics=[output_rows=3, 
elapsed_compute=699ns]                                                          
                                                                                
                           |
   |                   |       UnionExec, metrics=[output_rows=3, 
elapsed_compute=198.269µs]                                                      
                                                                                
                                       |
   |                   |         RepartitionExec: 
partitioning=RoundRobinBatch(3), metrics=[repart_time{inputPartition=0}=NOT 
RECORDED, fetch_time{inputPartition=0}=11.908387ms, 
send_time{inputPartition=0}=3.816µs]                                            
       |
   |                   |           GlobalLimitExec: limit=2, 
metrics=[output_rows=1, elapsed_compute=157ns]                                  
                                                                                
                                            |
   |                   |             ProjectionExec: expr=[COUNT(UInt8(1))@0 as 
cnt], metrics=[output_rows=1, elapsed_compute=4.125µs]                          
                                                                                
                         |
   |                   |               HashAggregateExec: mode=Final, gby=[], 
aggr=[COUNT(UInt8(1))], metrics=[output_rows=1, elapsed_compute=66.501µs]       
                                                                                
                           |
   |                   |                 CoalescePartitionsExec, 
metrics=[output_rows=3, elapsed_compute=11.969µs]                               
                                                                                
                                        |
   |                   |                   HashAggregateExec: mode=Partial, 
gby=[], aggr=[COUNT(UInt8(1))], metrics=[output_rows=3, 
elapsed_compute=89.888µs]                                                       
                                                     |
   |                   |                     RepartitionExec: 
partitioning=RoundRobinBatch(3), 
metrics=[fetch_time{inputPartition=0}=11.049667ms, 
send_time{inputPartition=0}=3.965µs, repart_time{inputPartition=0}=NOT 
RECORDED]                                       |
   |                   |                       SortExec: [c1@1 ASC], 
metrics=[output_rows=5, elapsed_compute=196.635µs]                              
                                                                                
                                    |
   |                   |                         CoalescePartitionsExec, 
metrics=[output_rows=5, elapsed_compute=11.127µs]                               
                                                                                
                                |
   |                   |                           ProjectionExec: 
expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1)), c1@0 as c1], 
metrics=[output_rows=5, elapsed_compute=17.878µs]                               
                                                             |
   |                   |                             HashAggregateExec: 
mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))], 
metrics=[output_rows=5, elapsed_compute=308.373µs]                              
                                                |
   |                   |                               CoalesceBatchesExec: 
target_batch_size=4096, metrics=[output_rows=5, elapsed_compute=77.054µs]       
                                                                                
                             |
   |                   |                                 RepartitionExec: 
partitioning=Hash([Column { name: "c1", index: 0 }], 3), 
metrics=[send_time{inputPartition=0}=NOT RECORDED, 
repart_time{inputPartition=0}=200.022µs, 
fetch_time{inputPartition=0}=28.377811ms] |
   |                   |                                   HashAggregateExec: 
mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))], metrics=[output_rows=5, 
elapsed_compute=585.625µs]                                                      
                           |
   |                   |                                     
CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=99, 
elapsed_compute=268.21µs]                                                       
                                                      |
   |                   |                                       FilterExec: 
c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, metrics=[output_rows=99, 
elapsed_compute=228.181µs]                                                      
                                            |
   |                   |                                         
RepartitionExec: partitioning=RoundRobinBatch(3), 
metrics=[repart_time{inputPartition=0}=NOT RECORDED, 
fetch_time{inputPartition=0}=6.765885ms, send_time{inputPartition=0}=3.554µs]   
                 |
   |                   |                                           CsvExec: 
source=Path(/Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv:
 [/Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv]), 
has_header=true, metrics=[]         |
   |                   |         RepartitionExec: 
partitioning=RoundRobinBatch(3), 
metrics=[fetch_time{inputPartition=0}=131.806µs, 
send_time{inputPartition=0}=11.372µs, repart_time{inputPartition=0}=NOT 
RECORDED]                                                    |
   |                   |           GlobalLimitExec: limit=2, 
metrics=[output_rows=1, elapsed_compute=211ns]                                  
                                                                                
                                            |
   |                   |             ProjectionExec: expr=[1 as cnt], 
metrics=[output_rows=1, elapsed_compute=60.248µs]                               
                                                                                
                                   |
   |                   |               EmptyExec: produce_one_row=true, 
metrics=[]                                                                      
                                                                                
                                 |
   |                   |         RepartitionExec: 
partitioning=RoundRobinBatch(3), metrics=[repart_time{inputPartition=0}=NOT 
RECORDED, fetch_time{inputPartition=0}=10.627386ms, 
send_time{inputPartition=0}=2.322µs]                                            
       |
   |                   |           GlobalLimitExec: limit=2, 
metrics=[output_rows=1, elapsed_compute=287ns]                                  
                                                                                
                                            |
   |                   |             CoalescePartitionsExec, 
metrics=[output_rows=1, elapsed_compute=11.487µs]                               
                                                                                
                                            |
   |                   |               ProjectionExec: 
expr=[LEAD(c1,Int64(1))@0 as cnt], metrics=[output_rows=1, 
elapsed_compute=5.57µs]                                                         
                                                                       |
   |                   |                 RepartitionExec: 
partitioning=RoundRobinBatch(3), metrics=[repart_time{inputPartition=0}=NOT 
RECORDED, send_time{inputPartition=0}=4.463µs, 
fetch_time{inputPartition=0}=618.203µs]                                         
    |
   |                   |                   WindowAggExec: 
wdw=[LEAD(c1,Int64(1)): Ok(Field { name: "LEAD(c1,Int64(1))", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None })], 
metrics=[output_rows=1, elapsed_compute=140.931µs]     |
   |                   |                     ProjectionExec: expr=[1 as c1], 
metrics=[output_rows=1, elapsed_compute=9.968µs]                                
                                                                                
                            |
   |                   |                       EmptyExec: produce_one_row=true, 
metrics=[]                                                                      
                                                                                
                         |
   
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to