alamb commented on a change in pull request #1732:
URL: https://github.com/apache/arrow-datafusion/pull/1732#discussion_r798039149



##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -107,93 +105,148 @@ impl PhysicalOptimizerRule for Repartition {
 }
 #[cfg(test)]
 mod tests {
-    use arrow::datatypes::Schema;
+    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 
     use super::*;
     use crate::datasource::PartitionedFile;
+    use crate::physical_plan::expressions::col;
     use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
-    use crate::physical_plan::projection::ProjectionExec;
-    use crate::physical_plan::Statistics;
+    use crate::physical_plan::filter::FilterExec;
+    use crate::physical_plan::hash_aggregate::{AggregateMode, 
HashAggregateExec};
+    use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+    use crate::physical_plan::{displayable, Statistics};
     use crate::test::object_store::TestObjectStore;
 
+    fn schema() -> SchemaRef {
+        Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
+    }
+
+    fn parquet_exec() -> Arc<ParquetExec> {
+        Arc::new(ParquetExec::new(
+            FileScanConfig {
+                object_store: TestObjectStore::new_arc(&[("x", 100)]),
+                file_schema: schema(),
+                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            None,
+        ))
+    }
+
+    fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+        Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), 
input).unwrap())
+    }
+
+    fn hash_aggregate(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> 
{
+        let schema = schema();
+        Arc::new(
+            HashAggregateExec::try_new(
+                AggregateMode::Final,
+                vec![],
+                vec![],
+                Arc::new(
+                    HashAggregateExec::try_new(
+                        AggregateMode::Partial,
+                        vec![],
+                        vec![],
+                        input,
+                        schema.clone(),
+                    )
+                    .unwrap(),
+                ),
+                schema,
+            )
+            .unwrap(),
+        )
+    }
+
+    fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+        Arc::new(GlobalLimitExec::new(
+            Arc::new(LocalLimitExec::new(input, 100)),
+            100,
+        ))
+    }
+
+    fn trim_plan_display(plan: &str) -> Vec<&str> {
+        plan.split('\n')
+            .map(|s| s.trim())
+            .filter(|s| !s.is_empty())
+            .collect()
+    }
+
     #[test]
     fn added_repartition_to_single_partition() -> Result<()> {
-        let file_schema = Arc::new(Schema::empty());
-        let parquet_project = ProjectionExec::try_new(
-            vec![],
-            Arc::new(ParquetExec::new(
-                FileScanConfig {
-                    object_store: TestObjectStore::new_arc(&[("x", 100)]),
-                    file_schema,
-                    file_groups: 
vec![vec![PartitionedFile::new("x".to_string(), 100)]],
-                    statistics: Statistics::default(),
-                    projection: None,
-                    limit: None,
-                    table_partition_cols: vec![],
-                },
-                None,
-            )),
-        )?;
-
         let optimizer = Repartition {};
 
         let optimized = optimizer.optimize(
-            Arc::new(parquet_project),
+            hash_aggregate(parquet_exec()),
             &ExecutionConfig::new().with_target_partitions(10),
         )?;
 
-        assert_eq!(
-            optimized.children()[0]
-                .output_partitioning()
-                .partition_count(),
-            10
-        );
+        let plan = displayable(optimized.as_ref()).indent().to_string();
+
+        let expected = &[
+            "HashAggregateExec: mode=Final, gby=[], aggr=[]",
+            "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "ParquetExec: limit=None, partitions=[x]",
+        ];
 
+        assert_eq!(&trim_plan_display(&plan), &expected);
         Ok(())
     }
 
     #[test]
     fn repartition_deepest_node() -> Result<()> {
-        let file_schema = Arc::new(Schema::empty());
-        let parquet_project = ProjectionExec::try_new(
-            vec![],
-            Arc::new(ProjectionExec::try_new(
-                vec![],
-                Arc::new(ParquetExec::new(
-                    FileScanConfig {
-                        object_store: TestObjectStore::new_arc(&[("x", 100)]),
-                        file_schema,
-                        file_groups: vec![vec![PartitionedFile::new(
-                            "x".to_string(),
-                            100,
-                        )]],
-                        statistics: Statistics::default(),
-                        projection: None,
-                        limit: None,
-                        table_partition_cols: vec![],
-                    },
-                    None,
-                )),
-            )?),
+        let optimizer = Repartition {};
+
+        let optimized = optimizer.optimize(
+            hash_aggregate(filter_exec(parquet_exec())),
+            &ExecutionConfig::new().with_target_partitions(10),
         )?;
 
+        let plan = displayable(optimized.as_ref()).indent().to_string();
+
+        let expected = &[
+            "HashAggregateExec: mode=Final, gby=[], aggr=[]",
+            "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+            "FilterExec: c1@0",
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "ParquetExec: limit=None, partitions=[x]",
+        ];
+
+        assert_eq!(&trim_plan_display(&plan), &expected);
+        Ok(())
+    }
+
+    #[test]
+    fn repartition_ignores() -> Result<()> {
         let optimizer = Repartition {};
 
         let optimized = optimizer.optimize(
-            Arc::new(parquet_project),
+            
hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec())))),
             &ExecutionConfig::new().with_target_partitions(10),
         )?;
 
-        // RepartitionExec is added to deepest node
-        assert!(optimized.children()[0]
-            .as_any()
-            .downcast_ref::<RepartitionExec>()
-            .is_none());
-        assert!(optimized.children()[0].children()[0]
-            .as_any()
-            .downcast_ref::<RepartitionExec>()
-            .is_some());
-
+        let plan = displayable(optimized.as_ref()).indent().to_string();
+
+        let expected = &[
+            "HashAggregateExec: mode=Final, gby=[], aggr=[]",
+            "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "GlobalLimitExec: limit=100",
+            "LocalLimitExec: limit=100",

Review comment:
       maybe worth a comment on the rationale here. Something like
   ```suggestion
               // Expect no repartition to happen for local limit
               "LocalLimitExec: limit=100",
   ```

##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -107,93 +105,148 @@ impl PhysicalOptimizerRule for Repartition {
 }
 #[cfg(test)]
 mod tests {
-    use arrow::datatypes::Schema;
+    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 
     use super::*;
     use crate::datasource::PartitionedFile;
+    use crate::physical_plan::expressions::col;
     use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
-    use crate::physical_plan::projection::ProjectionExec;
-    use crate::physical_plan::Statistics;
+    use crate::physical_plan::filter::FilterExec;
+    use crate::physical_plan::hash_aggregate::{AggregateMode, 
HashAggregateExec};
+    use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+    use crate::physical_plan::{displayable, Statistics};
     use crate::test::object_store::TestObjectStore;
 
+    fn schema() -> SchemaRef {
+        Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
+    }
+
+    fn parquet_exec() -> Arc<ParquetExec> {
+        Arc::new(ParquetExec::new(
+            FileScanConfig {
+                object_store: TestObjectStore::new_arc(&[("x", 100)]),
+                file_schema: schema(),
+                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            None,
+        ))
+    }
+
+    fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+        Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), 
input).unwrap())
+    }
+
+    fn hash_aggregate(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> 
{
+        let schema = schema();
+        Arc::new(
+            HashAggregateExec::try_new(
+                AggregateMode::Final,
+                vec![],
+                vec![],
+                Arc::new(
+                    HashAggregateExec::try_new(
+                        AggregateMode::Partial,
+                        vec![],
+                        vec![],
+                        input,
+                        schema.clone(),
+                    )
+                    .unwrap(),
+                ),
+                schema,
+            )
+            .unwrap(),
+        )
+    }
+
+    fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+        Arc::new(GlobalLimitExec::new(
+            Arc::new(LocalLimitExec::new(input, 100)),
+            100,
+        ))
+    }
+
+    fn trim_plan_display(plan: &str) -> Vec<&str> {
+        plan.split('\n')
+            .map(|s| s.trim())
+            .filter(|s| !s.is_empty())
+            .collect()
+    }
+
     #[test]
     fn added_repartition_to_single_partition() -> Result<()> {
-        let file_schema = Arc::new(Schema::empty());
-        let parquet_project = ProjectionExec::try_new(
-            vec![],
-            Arc::new(ParquetExec::new(
-                FileScanConfig {
-                    object_store: TestObjectStore::new_arc(&[("x", 100)]),
-                    file_schema,
-                    file_groups: 
vec![vec![PartitionedFile::new("x".to_string(), 100)]],
-                    statistics: Statistics::default(),
-                    projection: None,
-                    limit: None,
-                    table_partition_cols: vec![],
-                },
-                None,
-            )),
-        )?;
-
         let optimizer = Repartition {};
 
         let optimized = optimizer.optimize(
-            Arc::new(parquet_project),
+            hash_aggregate(parquet_exec()),
             &ExecutionConfig::new().with_target_partitions(10),
         )?;
 
-        assert_eq!(
-            optimized.children()[0]
-                .output_partitioning()
-                .partition_count(),
-            10
-        );
+        let plan = displayable(optimized.as_ref()).indent().to_string();
+
+        let expected = &[
+            "HashAggregateExec: mode=Final, gby=[], aggr=[]",
+            "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "ParquetExec: limit=None, partitions=[x]",
+        ];
 
+        assert_eq!(&trim_plan_display(&plan), &expected);
         Ok(())
     }
 
     #[test]
     fn repartition_deepest_node() -> Result<()> {
-        let file_schema = Arc::new(Schema::empty());
-        let parquet_project = ProjectionExec::try_new(
-            vec![],
-            Arc::new(ProjectionExec::try_new(
-                vec![],
-                Arc::new(ParquetExec::new(
-                    FileScanConfig {
-                        object_store: TestObjectStore::new_arc(&[("x", 100)]),
-                        file_schema,
-                        file_groups: vec![vec![PartitionedFile::new(
-                            "x".to_string(),
-                            100,
-                        )]],
-                        statistics: Statistics::default(),
-                        projection: None,
-                        limit: None,
-                        table_partition_cols: vec![],
-                    },
-                    None,
-                )),
-            )?),
+        let optimizer = Repartition {};
+
+        let optimized = optimizer.optimize(
+            hash_aggregate(filter_exec(parquet_exec())),
+            &ExecutionConfig::new().with_target_partitions(10),
         )?;
 
+        let plan = displayable(optimized.as_ref()).indent().to_string();
+
+        let expected = &[

Review comment:
       I think this is a good improvement to the tests 👍 

##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -135,14 +135,32 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// Returns the execution plan as [`Any`](std::any::Any) so that it can be
     /// downcast to a specific implementation.
     fn as_any(&self) -> &dyn Any;
+
     /// Get the schema for this execution plan
     fn schema(&self) -> SchemaRef;
+
     /// Specifies the output partitioning scheme of this plan
     fn output_partitioning(&self) -> Partitioning;
+
     /// Specifies the data distribution requirements of all the children for 
this operator
     fn required_child_distribution(&self) -> Distribution {
         Distribution::UnspecifiedDistribution
     }
+
+    /// Returns `true` if the direct children of this `ExecutionPlan` should 
be repartitioned
+    /// to introduce greater concurrency to the plan
+    ///
+    /// The default implementation returns `true` unless 
`Self::request_child_distribution`

Review comment:
       ```suggestion
       /// The default implementation returns `true` unless 
`Self::required_child_distribution`
   ```

##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -107,93 +105,148 @@ impl PhysicalOptimizerRule for Repartition {
 }
 #[cfg(test)]
 mod tests {
-    use arrow::datatypes::Schema;
+    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 
     use super::*;
     use crate::datasource::PartitionedFile;
+    use crate::physical_plan::expressions::col;
     use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
-    use crate::physical_plan::projection::ProjectionExec;
-    use crate::physical_plan::Statistics;
+    use crate::physical_plan::filter::FilterExec;
+    use crate::physical_plan::hash_aggregate::{AggregateMode, 
HashAggregateExec};
+    use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+    use crate::physical_plan::{displayable, Statistics};
     use crate::test::object_store::TestObjectStore;
 
+    fn schema() -> SchemaRef {
+        Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
+    }
+
+    fn parquet_exec() -> Arc<ParquetExec> {
+        Arc::new(ParquetExec::new(
+            FileScanConfig {
+                object_store: TestObjectStore::new_arc(&[("x", 100)]),
+                file_schema: schema(),
+                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            None,
+        ))
+    }
+
+    fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+        Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), 
input).unwrap())
+    }
+
+    fn hash_aggregate(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> 
{
+        let schema = schema();
+        Arc::new(
+            HashAggregateExec::try_new(
+                AggregateMode::Final,
+                vec![],
+                vec![],
+                Arc::new(
+                    HashAggregateExec::try_new(
+                        AggregateMode::Partial,
+                        vec![],
+                        vec![],
+                        input,
+                        schema.clone(),
+                    )
+                    .unwrap(),
+                ),
+                schema,
+            )
+            .unwrap(),
+        )
+    }
+
+    fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+        Arc::new(GlobalLimitExec::new(
+            Arc::new(LocalLimitExec::new(input, 100)),
+            100,
+        ))
+    }
+
+    fn trim_plan_display(plan: &str) -> Vec<&str> {
+        plan.split('\n')
+            .map(|s| s.trim())
+            .filter(|s| !s.is_empty())
+            .collect()
+    }
+
     #[test]
     fn added_repartition_to_single_partition() -> Result<()> {
-        let file_schema = Arc::new(Schema::empty());
-        let parquet_project = ProjectionExec::try_new(
-            vec![],
-            Arc::new(ParquetExec::new(
-                FileScanConfig {
-                    object_store: TestObjectStore::new_arc(&[("x", 100)]),
-                    file_schema,
-                    file_groups: 
vec![vec![PartitionedFile::new("x".to_string(), 100)]],
-                    statistics: Statistics::default(),
-                    projection: None,
-                    limit: None,
-                    table_partition_cols: vec![],
-                },
-                None,
-            )),
-        )?;
-
         let optimizer = Repartition {};
 
         let optimized = optimizer.optimize(
-            Arc::new(parquet_project),
+            hash_aggregate(parquet_exec()),
             &ExecutionConfig::new().with_target_partitions(10),
         )?;
 
-        assert_eq!(
-            optimized.children()[0]
-                .output_partitioning()
-                .partition_count(),
-            10
-        );
+        let plan = displayable(optimized.as_ref()).indent().to_string();
+
+        let expected = &[
+            "HashAggregateExec: mode=Final, gby=[], aggr=[]",
+            "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "ParquetExec: limit=None, partitions=[x]",
+        ];
 
+        assert_eq!(&trim_plan_display(&plan), &expected);
         Ok(())
     }
 
     #[test]
     fn repartition_deepest_node() -> Result<()> {
-        let file_schema = Arc::new(Schema::empty());
-        let parquet_project = ProjectionExec::try_new(
-            vec![],
-            Arc::new(ProjectionExec::try_new(
-                vec![],
-                Arc::new(ParquetExec::new(
-                    FileScanConfig {
-                        object_store: TestObjectStore::new_arc(&[("x", 100)]),
-                        file_schema,
-                        file_groups: vec![vec![PartitionedFile::new(
-                            "x".to_string(),
-                            100,
-                        )]],
-                        statistics: Statistics::default(),
-                        projection: None,
-                        limit: None,
-                        table_partition_cols: vec![],
-                    },
-                    None,
-                )),
-            )?),
+        let optimizer = Repartition {};
+
+        let optimized = optimizer.optimize(
+            hash_aggregate(filter_exec(parquet_exec())),
+            &ExecutionConfig::new().with_target_partitions(10),
         )?;
 
+        let plan = displayable(optimized.as_ref()).indent().to_string();
+
+        let expected = &[
+            "HashAggregateExec: mode=Final, gby=[], aggr=[]",
+            "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+            "FilterExec: c1@0",
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "ParquetExec: limit=None, partitions=[x]",
+        ];
+
+        assert_eq!(&trim_plan_display(&plan), &expected);
+        Ok(())
+    }
+
+    #[test]
+    fn repartition_ignores() -> Result<()> {
         let optimizer = Repartition {};
 
         let optimized = optimizer.optimize(
-            Arc::new(parquet_project),
+            
hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec())))),
             &ExecutionConfig::new().with_target_partitions(10),
         )?;
 
-        // RepartitionExec is added to deepest node
-        assert!(optimized.children()[0]
-            .as_any()
-            .downcast_ref::<RepartitionExec>()
-            .is_none());
-        assert!(optimized.children()[0].children()[0]
-            .as_any()
-            .downcast_ref::<RepartitionExec>()
-            .is_some());
-
+        let plan = displayable(optimized.as_ref()).indent().to_string();
+
+        let expected = &[
+            "HashAggregateExec: mode=Final, gby=[], aggr=[]",
+            "HashAggregateExec: mode=Partial, gby=[], aggr=[]",
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "GlobalLimitExec: limit=100",
+            "LocalLimitExec: limit=100",
+            "FilterExec: c1@0",
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "GlobalLimitExec: limit=100",
+            "LocalLimitExec: limit=100", // Should not repartition for 
LocalLimitExec
+            "ParquetExec: limit=None, partitions=[x]",
+        ];
+

Review comment:
       Could we also add a test for `UnionExec` as well? Mostly to guard 
against regression?
   
   I notice that there is already coverage for repartitioning projection which 
is good 👍 
   
https://github.com/apache/arrow-datafusion/pull/1732/commits/c4c0a1ce876bcaff5096e19c44e5cc3ea0f38760

##########
File path: datafusion/src/physical_plan/limit.rs
##########
@@ -300,6 +300,10 @@ impl ExecutionPlan for LocalLimitExec {
             _ => Statistics::default(),
         }
     }
+
+    fn should_repartition_children(&self) -> bool {

Review comment:
       ```suggestion
       /// No reason to repartition children as this node is just limiting each 
input partition. 
       fn should_repartition_children(&self) -> bool {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to