milenkovicm opened a new pull request, #1649: URL: https://github.com/apache/datafusion-ballista/pull/1649
# Which issue does this PR close? Mentioned in in https://github.com/apache/datafusion-ballista/pull/1647#issuecomment-4365479608 # Rationale for this change AQE planner eagerly will split plan into stages: ``` AdaptiveDatafusionExec: is_final=false, plan_id=4, stage_id=pending ProjectionExec: expr=[sum(t0.c0)@1 as sum(t0.c0)] AggregateExec: mode=FinalPartitioned, gby=[c0@0 as c0], aggr=[sum(t0.c0)] ExchangeExec: partitioning=Hash([c0@0], 2), plan_id=3, stage_id=pending, stage_resolved=false AggregateExec: mode=Partial, gby=[c0@0 as c0], aggr=[sum(t0.c0)] HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p2@0, c2@1)], projection=[c0@1] CoalescePartitionsExec ExchangeExec: partitioning=None, plan_id=1, stage_id=pending, stage_resolved=false ProjectionExec: expr=[c@0 as p2] AggregateExec: mode=FinalPartitioned, gby=[c@0 as c], aggr=[] ExchangeExec: partitioning=Hash([c@0], 2), plan_id=0, stage_id=pending, stage_resolved=false AggregateExec: mode=Partial, gby=[c@0 as c], aggr=[] DataSourceExec: partitions=1, partition_sizes=[1] ProjectionExec: expr=[min(t.a)@1 as c0, c@0 as c2] AggregateExec: mode=FinalPartitioned, gby=[c@0 as c], aggr=[min(t.a)] ExchangeExec: partitioning=Hash([c@0], 2), plan_id=2, stage_id=pending, stage_resolved=false AggregateExec: mode=Partial, gby=[c@1 as c], aggr=[min(t.a)] DataSourceExec: partitions=1, partition_sizes=[1] ``` plane execution is presubscribed based on little to no information, which makes re-planing a bit harder (if not impossible). This might have implications on broadcasting, partition coalescing .... Suggestion is to change this make AQE plan one step at the time: ``` AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending ProjectionExec: expr=[sum(t0.c0)@1 as sum(t0.c0)] AggregateExec: mode=FinalPartitioned, gby=[c0@0 as c0], aggr=[sum(t0.c0)] RepartitionExec: partitioning=Hash([c0@0], 2), input_partitions=2 AggregateExec: mode=Partial, gby=[c0@0 as c0], aggr=[sum(t0.c0)] HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p2@0, c2@1)], projection=[c0@1] CoalescePartitionsExec ProjectionExec: expr=[c@0 as p2] AggregateExec: mode=FinalPartitioned, gby=[c@0 as c], aggr=[] ExchangeExec: partitioning=Hash([c@0], 2), plan_id=0, stage_id=pending, stage_resolved=false AggregateExec: mode=Partial, gby=[c@0 as c], aggr=[] DataSourceExec: partitions=1, partition_sizes=[1] ProjectionExec: expr=[min(t.a)@1 as c0, c@0 as c2] AggregateExec: mode=FinalPartitioned, gby=[c@0 as c], aggr=[min(t.a)] ExchangeExec: partitioning=Hash([c@0], 2), plan_id=1, stage_id=pending, stage_resolved=false AggregateExec: mode=Partial, gby=[c@1 as c], aggr=[min(t.a)] DataSourceExec: partitions=1, partition_sizes=[1] ``` note only two `ExchangeExec` in this implementation # What changes are included in this PR? - [x] `DistributedExchangeRule` change to limit its blast radius - [ ] I run `DistributedExchangeRule` after all other rules, giving ability for datafusion (and added) planners to re-optimise plan - [ ] Find a way to stop re-planning as `if DistributedExchangeRule::default().is_plan_transformed(plan.clone())? ` may not work if we move `DistributedExchangeRule` - [ ] prove that collect left will be preserver based on available runtime statistics # Are there any user-facing changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
