milenkovicm opened a new pull request, #1649:
URL: https://github.com/apache/datafusion-ballista/pull/1649

   # Which issue does this PR close?
   
   Mentioned in in 
https://github.com/apache/datafusion-ballista/pull/1647#issuecomment-4365479608 
   
    # Rationale for this change
   
   AQE planner eagerly will split plan into stages:
   
   ```
   AdaptiveDatafusionExec: is_final=false, plan_id=4, stage_id=pending
         ProjectionExec: expr=[sum(t0.c0)@1 as sum(t0.c0)]
           AggregateExec: mode=FinalPartitioned, gby=[c0@0 as c0], 
aggr=[sum(t0.c0)]
             ExchangeExec: partitioning=Hash([c0@0], 2), plan_id=3, 
stage_id=pending, stage_resolved=false
               AggregateExec: mode=Partial, gby=[c0@0 as c0], aggr=[sum(t0.c0)]
                 HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p2@0, 
c2@1)], projection=[c0@1]
                   CoalescePartitionsExec
                     ExchangeExec: partitioning=None, plan_id=1, 
stage_id=pending, stage_resolved=false
                       ProjectionExec: expr=[c@0 as p2]
                         AggregateExec: mode=FinalPartitioned, gby=[c@0 as c], 
aggr=[]
                           ExchangeExec: partitioning=Hash([c@0], 2), 
plan_id=0, stage_id=pending, stage_resolved=false
                             AggregateExec: mode=Partial, gby=[c@0 as c], 
aggr=[]
                               DataSourceExec: partitions=1, partition_sizes=[1]
                   ProjectionExec: expr=[min(t.a)@1 as c0, c@0 as c2]
                     AggregateExec: mode=FinalPartitioned, gby=[c@0 as c], 
aggr=[min(t.a)]
                       ExchangeExec: partitioning=Hash([c@0], 2), plan_id=2, 
stage_id=pending, stage_resolved=false
                         AggregateExec: mode=Partial, gby=[c@1 as c], 
aggr=[min(t.a)]
                           DataSourceExec: partitions=1, partition_sizes=[1]
   ```
   
   plane execution is presubscribed based on little to no information, which 
makes re-planing a bit harder (if not impossible). This might have implications 
on broadcasting, partition coalescing .... 
   
   Suggestion is to change this make AQE plan one step at the time:
   
   
   ```
   AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending
         ProjectionExec: expr=[sum(t0.c0)@1 as sum(t0.c0)]
           AggregateExec: mode=FinalPartitioned, gby=[c0@0 as c0], 
aggr=[sum(t0.c0)]
             RepartitionExec: partitioning=Hash([c0@0], 2), input_partitions=2
               AggregateExec: mode=Partial, gby=[c0@0 as c0], aggr=[sum(t0.c0)]
                 HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p2@0, 
c2@1)], projection=[c0@1]
                   CoalescePartitionsExec
                     ProjectionExec: expr=[c@0 as p2]
                       AggregateExec: mode=FinalPartitioned, gby=[c@0 as c], 
aggr=[]
                         ExchangeExec: partitioning=Hash([c@0], 2), plan_id=0, 
stage_id=pending, stage_resolved=false
                           AggregateExec: mode=Partial, gby=[c@0 as c], aggr=[]
                             DataSourceExec: partitions=1, partition_sizes=[1]
                   ProjectionExec: expr=[min(t.a)@1 as c0, c@0 as c2]
                     AggregateExec: mode=FinalPartitioned, gby=[c@0 as c], 
aggr=[min(t.a)]
                       ExchangeExec: partitioning=Hash([c@0], 2), plan_id=1, 
stage_id=pending, stage_resolved=false
                         AggregateExec: mode=Partial, gby=[c@1 as c], 
aggr=[min(t.a)]
                           DataSourceExec: partitions=1, partition_sizes=[1]
   ```
   
   note only two `ExchangeExec` in this implementation 
   
   # What changes are included in this PR?
   
   - [x] `DistributedExchangeRule` change to limit its blast radius
   - [ ] I run `DistributedExchangeRule` after all other rules, giving ability 
for datafusion (and added) planners to re-optimise plan 
   - [ ] Find a way to stop re-planning as `if 
DistributedExchangeRule::default().is_plan_transformed(plan.clone())? ` may not 
work if we move `DistributedExchangeRule`
   - [ ] prove that collect left will be preserver based on available runtime 
statistics 
   
   # Are there any user-facing changes?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to