[jira] [Updated] (ARROW-9464) [Rust] [DataFusion] Physical plan refactor to support async and optimization rules

2020-08-12 Thread Andy Grove (Jira)


 [ 
https://issues.apache.org/jira/browse/ARROW-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Grove updated ARROW-9464:
--
Description: 
I would like to propose a refactor of the physical/execution planning based on 
the experience I have had in implementing distributed execution in Ballista.

This will likely need subtasks but here is an overview of the changes I am 
proposing.
h3. *Introduce enum to represent physical plan.*

By wrapping the execution plan structs in an enum, we make it possible to build 
a tree representing the physical plan just like we do with the logical plan. 
This makes it easy to print physical plans and also to apply transformations to 
it.
{code:java}
 pub enum PhysicalPlan {
/// Projection.
Projection(Arc),
/// Filter a.k.a predicate.
Filter(Arc),
/// Hash aggregate
HashAggregate(Arc),
/// Performs a hash join of two child relations by first shuffling the data 
using the join keys.
ShuffledHashJoin(ShuffledHashJoinExec),
/// Performs a shuffle that will result in the desired partitioning.
ShuffleExchange(Arc),
/// Reads results from a ShuffleExchange
ShuffleReader(Arc),
/// Scans a partitioned data source
ParquetScan(Arc),
/// Scans an in-memory table
InMemoryTableScan(Arc),
}{code}
h3. *Introduce physical plan optimization rule to insert "shuffle" operators*

We should extend the ExecutionPlan trait so that each operator can specify its 
input and output partitioning needs, and then have an optimization rule that 
can insert any repartioning or reordering steps required.

For example, these are the methods to be added to ExecutionPlan. This design is 
based on Apache Spark.

 
{code:java}
/// Specifies how data is partitioned across different nodes in the cluster
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(0)
}

/// Specifies the data distribution requirements of all the children for this 
operator
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}

/// Specifies how data is ordered in each partition
fn output_ordering(&self) -> Option> {
None
}

/// Specifies the data distribution requirements of all the children for this 
operator
fn required_child_ordering(&self) -> Option>> {
None
}
 {code}
A good example of applying this rule would be in the case of hash aggregates 
where we perform a partial aggregate in parallel across partitions and then 
coalesce the results and apply a final hash aggregate.

Another example would be a SortMergeExec specifying the sort order required for 
its children.

 

 

  was:
I would like to propose a refactor of the physical/execution planning based on 
the experience I have had in implementing distributed execution in Ballista.

This will likely need subtasks but here is an overview of the changes I am 
proposing.
h3. *Introduce enum to represent physical plan.*

By wrapping the execution plan structs in an enum, we make it possible to build 
a tree representing the physical plan just like we do with the logical plan. 
This makes it easy to print physical plans and also to apply transformations to 
it.
{code:java}
 pub enum PhysicalPlan {
/// Projection.
Projection(Arc),
/// Filter a.k.a predicate.
Filter(Arc),
/// Hash aggregate
HashAggregate(Arc),
/// Performs a hash join of two child relations by first shuffling the data 
using the join keys.
ShuffledHashJoin(ShuffledHashJoinExec),
/// Performs a shuffle that will result in the desired partitioning.
ShuffleExchange(Arc),
/// Reads results from a ShuffleExchange
ShuffleReader(Arc),
/// Scans a partitioned data source
ParquetScan(Arc),
/// Scans an in-memory table
InMemoryTableScan(Arc),
}{code}
h3. *Introduce physical plan optimization rule to insert "shuffle" operators*

We should extend the ExecutionPlan trait so that each operator can specify its 
input and output partitioning needs, and then have an optimization rule that 
can insert any repartioning or reordering steps required.

For example, these are the methods to be added to ExecutionPlan. This design is 
based on Apache Spark.

 
{code:java}
/// Specifies how data is partitioned across different nodes in the cluster
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(0)
}

/// Specifies the data distribution requirements of all the children for this 
operator
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}

/// Specifies how data is ordered in each partition
fn output_ordering(&self) -> Option> {
None
}

/// Specifies the data distribution requirements of all the children for this 
operator
fn required_child_ordering(&self) -> Option>> {
None
}
 {code}
A good example of applying this rule would be in the case of hash aggregates 
where we perform

[jira] [Updated] (ARROW-9464) [Rust] [DataFusion] Physical plan refactor to support async and optimization rules

2020-07-14 Thread Andy Grove (Jira)


 [ 
https://issues.apache.org/jira/browse/ARROW-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Grove updated ARROW-9464:
--
Description: 
I would like to propose a refactor of the physical/execution planning based on 
the experience I have had in implementing distributed execution in Ballista.

This will likely need subtasks but here is an overview of the changes I am 
proposing.
h3. *Introduce enum to represent physical plan.*

By wrapping the execution plan structs in an enum, we make it possible to build 
a tree representing the physical plan just like we do with the logical plan. 
This makes it easy to print physical plans and also to apply transformations to 
it.
{code:java}
 pub enum PhysicalPlan {
/// Projection.
Projection(Arc),
/// Filter a.k.a predicate.
Filter(Arc),
/// Hash aggregate
HashAggregate(Arc),
/// Performs a hash join of two child relations by first shuffling the data 
using the join keys.
ShuffledHashJoin(ShuffledHashJoinExec),
/// Performs a shuffle that will result in the desired partitioning.
ShuffleExchange(Arc),
/// Reads results from a ShuffleExchange
ShuffleReader(Arc),
/// Scans a partitioned data source
ParquetScan(Arc),
/// Scans an in-memory table
InMemoryTableScan(Arc),
}{code}
h3. *Introduce physical plan optimization rule to insert "shuffle" operators*

We should extend the ExecutionPlan trait so that each operator can specify its 
input and output partitioning needs, and then have an optimization rule that 
can insert any repartioning or reordering steps required.

For example, these are the methods to be added to ExecutionPlan. This design is 
based on Apache Spark.

 
{code:java}
/// Specifies how data is partitioned across different nodes in the cluster
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(0)
}

/// Specifies the data distribution requirements of all the children for this 
operator
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}

/// Specifies how data is ordered in each partition
fn output_ordering(&self) -> Option> {
None
}

/// Specifies the data distribution requirements of all the children for this 
operator
fn required_child_ordering(&self) -> Option>> {
None
}
 {code}
A good example of applying this rule would be in the case of hash aggregates 
where we perform a partial aggregate in parallel across partitions and then 
coalesce the results and apply a final hash aggregate.

Another example would be a SortMergeExec specifying the sort order required for 
its children.
h3. Make execution async

The execution plan trait should use the async keyword. This will require adding 
dependencies on async_trait and smol. This allows us to remove much of the 
manual thread management and have more efficient execution.

The main benefits of these changes are:
 # Simplify implementation of physical operators, because the optimizer will 
take care of repartitioning concerns
 # The ability to print a physical query plan
 # More efficient query execution because of the use of async
 # Easier for projects like Ballista to use DataFusion and add their own 
optimization rules e.g. replacing repartitioning steps with distributed 
equivalents

 

  was:
I would like to propose a refactor of the physical/execution planning based on 
the experience I have had in implementing distributed execution in Ballista.

This will likely need subtasks but here is an overview of the changes I am 
proposing.

## 1. Introduce enum to represent physical plan.

By wrapping the execution plan structs in an enum, we make it possible to build 
a tree representing the physical plan just like we do with the logical plan. 
This makes it easy to print physical plans and also to apply transformations to 
it.
{code:java}
 pub enum PhysicalPlan {
/// Projection.
Projection(Arc),
/// Filter a.k.a predicate.
Filter(Arc),
/// Hash aggregate
HashAggregate(Arc),
/// Performs a hash join of two child relations by first shuffling the data 
using the join keys.
ShuffledHashJoin(ShuffledHashJoinExec),
/// Performs a shuffle that will result in the desired partitioning.
ShuffleExchange(Arc),
/// Reads results from a ShuffleExchange
ShuffleReader(Arc),
/// Scans a partitioned data source
ParquetScan(Arc),
/// Scans an in-memory table
InMemoryTableScan(Arc),
}{code}
## 2. Introduce physical plan optimization rule to insert "shuffle" operators

We should extend the ExecutionPlan trait so that each operator can specify its 
input and output partitioning needs, and then have an optimization rule that 
can insert any repartioning or reordering steps required.

For example, these are the methods to be added to ExecutionPlan. This design is 
based on Apache Spark.

 
{code:java}
/// Specifies how data is partitioned across 

[jira] [Updated] (ARROW-9464) [Rust] [DataFusion] Physical plan refactor to support async and optimization rules

2020-07-14 Thread Andy Grove (Jira)


 [ 
https://issues.apache.org/jira/browse/ARROW-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Grove updated ARROW-9464:
--
Description: 
I would like to propose a refactor of the physical/execution planning based on 
the experience I have had in implementing distributed execution in Ballista.

This will likely need subtasks but here is an overview of the changes I am 
proposing.

## 1. Introduce enum to represent physical plan.

By wrapping the execution plan structs in an enum, we make it possible to build 
a tree representing the physical plan just like we do with the logical plan. 
This makes it easy to print physical plans and also to apply transformations to 
it.
{code:java}
 pub enum PhysicalPlan {
/// Projection.
Projection(Arc),
/// Filter a.k.a predicate.
Filter(Arc),
/// Hash aggregate
HashAggregate(Arc),
/// Performs a hash join of two child relations by first shuffling the data 
using the join keys.
ShuffledHashJoin(ShuffledHashJoinExec),
/// Performs a shuffle that will result in the desired partitioning.
ShuffleExchange(Arc),
/// Reads results from a ShuffleExchange
ShuffleReader(Arc),
/// Scans a partitioned data source
ParquetScan(Arc),
/// Scans an in-memory table
InMemoryTableScan(Arc),
}{code}
## 2. Introduce physical plan optimization rule to insert "shuffle" operators

We should extend the ExecutionPlan trait so that each operator can specify its 
input and output partitioning needs, and then have an optimization rule that 
can insert any repartioning or reordering steps required.

For example, these are the methods to be added to ExecutionPlan. This design is 
based on Apache Spark.

 
{code:java}
/// Specifies how data is partitioned across different nodes in the cluster
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(0)
}

/// Specifies the data distribution requirements of all the children for this 
operator
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}

/// Specifies how data is ordered in each partition
fn output_ordering(&self) -> Option> {
None
}

/// Specifies the data distribution requirements of all the children for this 
operator
fn required_child_ordering(&self) -> Option>> {
None
}
 {code}
A good example of applying this rule would be in the case of hash aggregates 
where we perform a partial aggregate in parallel across partitions and then 
coalesce the results and apply a final hash aggregate.

Another example would be a SortMergeExec specifying the sort order required for 
its children.

## 3. Make execution async

The execution plan trait should use the async keyword. This will require adding 
dependencies on async_trait and smol. This allows us to remove much of the 
manual thread management and have more efficient execution.

The main benefits of these changes are:
 # Simplify implementation of physical operators, because the optimizer will 
take care of repartitioning concerns
 # The ability to print a physical query plan
 # More efficient query execution because of the use of async
 # Easier for projects like Ballista to use DataFusion and add their own 
optimization rules e.g. replacing repartitioning steps with distributed 
equivalents

 

  was:
I would like to propose a refactor of the physical/execution planning based 
experience I have had in implementing distributed execution in Ballista.

This will likely need subtasks but here is an overview of the changes I am 
proposing.

>> 1. Introduce enum to represent physical plan.

By wrapping the execution plan structs in an enum, we make it possible to build 
a tree representing the physical plan just like we do with the logical plan. 
This makes it easy to print physical plans and also to apply transformations to 
it.
{code:java}
 pub enum PhysicalPlan {
/// Projection.
Projection(Arc),
/// Filter a.k.a predicate.
Filter(Arc),
/// Hash aggregate
HashAggregate(Arc),
/// Performs a hash join of two child relations by first shuffling the data 
using the join keys.
ShuffledHashJoin(ShuffledHashJoinExec),
/// Performs a shuffle that will result in the desired partitioning.
ShuffleExchange(Arc),
/// Reads results from a ShuffleExchange
ShuffleReader(Arc),
/// Scans a partitioned data source
ParquetScan(Arc),
/// Scans an in-memory table
InMemoryTableScan(Arc),
}{code}
>> 2. Introduce physical plan optimization rule to insert "shuffle" operators

We should extend the ExecutionPlan trait so that each operator can specify its 
input and output partitioning needs, and then have an optimization rule that 
can insert any repartioning or reordering steps required.

For example, these are the methods to be added to ExecutionPlan. This design is 
based on Apache Spark.

 
{code:java}
/// Specifies how data is partitioned across dif