Andy Grove created ARROW-9464: --------------------------------- Summary: [Rust] [DataFusion] Physical plan refactor to support async and optimization rules Key: ARROW-9464 URL: https://issues.apache.org/jira/browse/ARROW-9464 Project: Apache Arrow Issue Type: Improvement Components: Rust, Rust - DataFusion Reporter: Andy Grove
I would like to propose a refactor of the physical/execution planning based experience I have had in implementing distributed execution in Ballista. This will likely need subtasks but here is an overview of the changes I am proposing. >> 1. Introduce enum to represent physical plan. By wrapping the execution plan structs in an enum, we make it possible to build a tree representing the physical plan just like we do with the logical plan. This makes it easy to print physical plans and also to apply transformations to it. {code:java} pub enum PhysicalPlan { /// Projection. Projection(Arc<ProjectionExec>), /// Filter a.k.a predicate. Filter(Arc<FilterExec>), /// Hash aggregate HashAggregate(Arc<HashAggregateExec>), /// Performs a hash join of two child relations by first shuffling the data using the join keys. ShuffledHashJoin(ShuffledHashJoinExec), /// Performs a shuffle that will result in the desired partitioning. ShuffleExchange(Arc<ShuffleExchangeExec>), /// Reads results from a ShuffleExchange ShuffleReader(Arc<ShuffleReaderExec>), /// Scans a partitioned data source ParquetScan(Arc<ParquetScanExec>), /// Scans an in-memory table InMemoryTableScan(Arc<InMemoryTableScanExec>), }{code} >> 2. Introduce physical plan optimization rule to insert "shuffle" operators We should extend the ExecutionPlan trait so that each operator can specify its input and output partitioning needs, and then have an optimization rule that can insert any repartioning or reordering steps required. For example, these are the methods to be added to ExecutionPlan. This design is based on Apache Spark. {code:java} /// Specifies how data is partitioned across different nodes in the cluster fn output_partitioning(&self) -> Partitioning { Partitioning::UnknownPartitioning(0) } /// Specifies the data distribution requirements of all the children for this operator fn required_child_distribution(&self) -> Distribution { Distribution::UnspecifiedDistribution } /// Specifies how data is ordered in each partition fn output_ordering(&self) -> Option<Vec<SortOrder>> { None } /// Specifies the data distribution requirements of all the children for this operator fn required_child_ordering(&self) -> Option<Vec<Vec<SortOrder>>> { None } {code} A good example of applying this rule would be in the case of hash aggregates where we perform a partial aggregate in parallel across partitions and then coalesce the results and apply a final hash aggregate. Another example would be a SortMergeExec specifying the sort order required for its children. >> 3. Make execution async The execution plan trait should use the async keyword. This will require adding dependencies on async_trait and smol. This allows us to remove much of the manual thread management and have more efficient execution. The main benefits of these changes are: # Simplify implementation of physical operators, because the optimizer will take care of repartitioning concerns # The ability to print a physical query plan # More efficient query execution because of the use of async # Easier for projects like Ballista to use DataFusion and add their own optimization rules e.g. replacing repartitioning steps with distributed equivalents -- This message was sent by Atlassian Jira (v8.3.4#803005)