[ https://issues.apache.org/jira/browse/ARROW-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176414#comment-17176414 ]
Andy Grove edited comment on ARROW-9464 at 8/12/20, 3:19 PM: ------------------------------------------------------------- Based on recent experience testing query execution with async, I no longer feel that async makes sense for DataFusion. Async is good for network io but not for file io. It is better to have a single thread per partition when executing queries. Also, we can't use async with Parquet currently without launching a dedicated thread per partition which pretty much defeats the point of using async in the first place. I believe that we do need the concept of executors and a scheduler in DataFusion, where each executor would run on a dedicated thread. Other projects would then be able to extend this for distributed execution for example. was (Author: andygrove): Based on recent experience testing query execution with async, I no longer feel that async makes sense for DataFusion. Async is good for network io but not for file io. It is better to have a dedicated thread per partition when executing queries. Also, we can't use async with Parquet currently without launching a dedicated thread per partition which pretty much defeats the point of using async in the first place. I believe that we do need the concept of executors and a scheduler in DataFusion, where each executor would run on a dedicated thread. Other projects would then be able to extend this for distributed execution for example. > [Rust] [DataFusion] Physical plan refactor to support optimization rules and > more efficient use of threads > ---------------------------------------------------------------------------------------------------------- > > Key: ARROW-9464 > URL: https://issues.apache.org/jira/browse/ARROW-9464 > Project: Apache Arrow > Issue Type: Improvement > Components: Rust, Rust - DataFusion > Reporter: Andy Grove > Assignee: Andy Grove > Priority: Major > > I would like to propose a refactor of the physical/execution planning based > on the experience I have had in implementing distributed execution in > Ballista. > This will likely need subtasks but here is an overview of the changes I am > proposing. > h3. *Introduce enum to represent physical plan.* > By wrapping the execution plan structs in an enum, we make it possible to > build a tree representing the physical plan just like we do with the logical > plan. This makes it easy to print physical plans and also to apply > transformations to it. > {code:java} > pub enum PhysicalPlan { > /// Projection. > Projection(Arc<ProjectionExec>), > /// Filter a.k.a predicate. > Filter(Arc<FilterExec>), > /// Hash aggregate > HashAggregate(Arc<HashAggregateExec>), > /// Performs a hash join of two child relations by first shuffling the > data using the join keys. > ShuffledHashJoin(ShuffledHashJoinExec), > /// Performs a shuffle that will result in the desired partitioning. > ShuffleExchange(Arc<ShuffleExchangeExec>), > /// Reads results from a ShuffleExchange > ShuffleReader(Arc<ShuffleReaderExec>), > /// Scans a partitioned data source > ParquetScan(Arc<ParquetScanExec>), > /// Scans an in-memory table > InMemoryTableScan(Arc<InMemoryTableScanExec>), > }{code} > h3. *Introduce physical plan optimization rule to insert "shuffle" operators* > We should extend the ExecutionPlan trait so that each operator can specify > its input and output partitioning needs, and then have an optimization rule > that can insert any repartioning or reordering steps required. > For example, these are the methods to be added to ExecutionPlan. This design > is based on Apache Spark. > > {code:java} > /// Specifies how data is partitioned across different nodes in the cluster > fn output_partitioning(&self) -> Partitioning { > Partitioning::UnknownPartitioning(0) > } > /// Specifies the data distribution requirements of all the children for this > operator > fn required_child_distribution(&self) -> Distribution { > Distribution::UnspecifiedDistribution > } > /// Specifies how data is ordered in each partition > fn output_ordering(&self) -> Option<Vec<SortOrder>> { > None > } > /// Specifies the data distribution requirements of all the children for this > operator > fn required_child_ordering(&self) -> Option<Vec<Vec<SortOrder>>> { > None > } > {code} > A good example of applying this rule would be in the case of hash aggregates > where we perform a partial aggregate in parallel across partitions and then > coalesce the results and apply a final hash aggregate. > Another example would be a SortMergeExec specifying the sort order required > for its children. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)