[ 
https://issues.apache.org/jira/browse/ARROW-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Grove reassigned ARROW-9464:
---------------------------------

    Assignee: Andy Grove

> [Rust] [DataFusion] Physical plan refactor to support async and optimization 
> rules
> ----------------------------------------------------------------------------------
>
>                 Key: ARROW-9464
>                 URL: https://issues.apache.org/jira/browse/ARROW-9464
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Rust, Rust - DataFusion
>            Reporter: Andy Grove
>            Assignee: Andy Grove
>            Priority: Major
>
> I would like to propose a refactor of the physical/execution planning based 
> on the experience I have had in implementing distributed execution in 
> Ballista.
> This will likely need subtasks but here is an overview of the changes I am 
> proposing.
> h3. *Introduce enum to represent physical plan.*
> By wrapping the execution plan structs in an enum, we make it possible to 
> build a tree representing the physical plan just like we do with the logical 
> plan. This makes it easy to print physical plans and also to apply 
> transformations to it.
> {code:java}
>  pub enum PhysicalPlan {
>     /// Projection.
>     Projection(Arc<ProjectionExec>),
>     /// Filter a.k.a predicate.
>     Filter(Arc<FilterExec>),
>     /// Hash aggregate
>     HashAggregate(Arc<HashAggregateExec>),
>     /// Performs a hash join of two child relations by first shuffling the 
> data using the join keys.
>     ShuffledHashJoin(ShuffledHashJoinExec),
>     /// Performs a shuffle that will result in the desired partitioning.
>     ShuffleExchange(Arc<ShuffleExchangeExec>),
>     /// Reads results from a ShuffleExchange
>     ShuffleReader(Arc<ShuffleReaderExec>),
>     /// Scans a partitioned data source
>     ParquetScan(Arc<ParquetScanExec>),
>     /// Scans an in-memory table
>     InMemoryTableScan(Arc<InMemoryTableScanExec>),
> }{code}
> h3. *Introduce physical plan optimization rule to insert "shuffle" operators*
> We should extend the ExecutionPlan trait so that each operator can specify 
> its input and output partitioning needs, and then have an optimization rule 
> that can insert any repartioning or reordering steps required.
> For example, these are the methods to be added to ExecutionPlan. This design 
> is based on Apache Spark.
>  
> {code:java}
> /// Specifies how data is partitioned across different nodes in the cluster
> fn output_partitioning(&self) -> Partitioning {
>     Partitioning::UnknownPartitioning(0)
> }
> /// Specifies the data distribution requirements of all the children for this 
> operator
> fn required_child_distribution(&self) -> Distribution {
>     Distribution::UnspecifiedDistribution
> }
> /// Specifies how data is ordered in each partition
> fn output_ordering(&self) -> Option<Vec<SortOrder>> {
>     None
> }
> /// Specifies the data distribution requirements of all the children for this 
> operator
> fn required_child_ordering(&self) -> Option<Vec<Vec<SortOrder>>> {
>     None
> }
>  {code}
> A good example of applying this rule would be in the case of hash aggregates 
> where we perform a partial aggregate in parallel across partitions and then 
> coalesce the results and apply a final hash aggregate.
> Another example would be a SortMergeExec specifying the sort order required 
> for its children.
> h3. Make execution async
> The execution plan trait should use the async keyword. This will require 
> adding dependencies on async_trait and smol. This allows us to remove much of 
> the manual thread management and have more efficient execution.
> The main benefits of these changes are:
>  # Simplify implementation of physical operators, because the optimizer will 
> take care of repartitioning concerns
>  # The ability to print a physical query plan
>  # More efficient query execution because of the use of async
>  # Easier for projects like Ballista to use DataFusion and add their own 
> optimization rules e.g. replacing repartitioning steps with distributed 
> equivalents
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to