NGA-TRAN commented on a change in pull request #337: URL: https://github.com/apache/arrow-datafusion/pull/337#discussion_r632078747
########## File path: datafusion/tests/sql.rs ########## @@ -2885,3 +2885,45 @@ async fn test_cast_expressions_error() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_physical_plan_display_indent() { + let mut ctx = ExecutionContext::new(); + register_aggregate_csv(&mut ctx).unwrap(); + let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \ + FROM aggregate_test_100 \ + WHERE c12 < 10 \ + GROUP BY c1 \ + ORDER BY the_min DESC \ + LIMIT 10"; + let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.optimize(&plan).unwrap(); + + let physical_plan = ctx.create_physical_plan(&plan).unwrap(); + let expected = vec![ + "GlobalLimitExec: limit=10", + " SortExec: [the_min DESC]", + " ProjectionExec: expr=[c1, MAX(c12), MIN(c12) as the_min]", + " HashAggregateExec: mode=Final, gby=[c1], aggr=[MAX(c12), MIN(c12)]", + " MergeExec", + " HashAggregateExec: mode=Partial, gby=[c1], aggr=[MAX(c12), MIN(c12)]", + " CoalesceBatchesExec: target_batch_size=4096", + " FilterExec: c12 < CAST(10 AS Float64)", + " RepartitionExec: partitioning=RoundRobinBatch(16)", Review comment: What is the RepartitionExec does? to split is to smaller batches to send to multi-threads? ########## File path: datafusion/src/physical_plan/mod.rs ########## @@ -152,6 +162,133 @@ pub trait ExecutionPlan: Debug + Send + Sync { fn metrics(&self) -> HashMap<String, SQLMetric> { HashMap::new() } + + /// Format this `ExecutionPlan` to `f` in the specified type. + /// + /// Should not include a newline + /// + /// Note this function prints a placeholder by default to preserve + /// backwards compatibility. + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "ExecutionPlan(PlaceHolder)") + } +} + +/// Return a [wrapper](DisplayableExecutionPlan) around an +/// [`ExecutionPlan`] which can be displayed in various easier to +/// understand ways. +/// +/// ``` +/// use datafusion::prelude::*; +/// use datafusion::physical_plan::displayable; +/// +/// // register the a table +/// let mut ctx = ExecutionContext::new(); +/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap(); +/// +/// // create a plan to run a SQL query +/// let plan = ctx +/// .create_logical_plan("SELECT a FROM example WHERE a < 5") +/// .unwrap(); +/// let plan = ctx.optimize(&plan).unwrap(); +/// let physical_plan = ctx.create_physical_plan(&plan).unwrap(); +/// +/// // Format using display string +/// let displayable_plan = displayable(physical_plan.as_ref()); +/// let plan_string = format!("{}", displayable_plan.indent()); +/// +/// assert_eq!("ProjectionExec: expr=[a]\ +/// \n CoalesceBatchesExec: target_batch_size=4096\ +/// \n FilterExec: a < 5\ +/// \n RepartitionExec: partitioning=RoundRobinBatch(16)\ +/// \n CsvExec: source=Path(tests/example.csv: [tests/example.csv]), has_header=true", +/// plan_string.trim()); +/// ``` +/// +pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> { + DisplayableExecutionPlan::new(plan) +} + +/// Visit all children of this plan, according to the order defined on `ExecutionPlanVisitor`. +// Note that this would be really nice if it were a method on +// ExecutionPlan, but it can not be because it takes a generic +// parameter and `ExecutionPlan` is a trait +pub fn accept<V: ExecutionPlanVisitor>( + plan: &dyn ExecutionPlan, + visitor: &mut V, +) -> std::result::Result<(), V::Error> { + visitor.pre_visit(plan)?; + for child in plan.children() { + visit_execution_plan(child.as_ref(), visitor)?; + } + visitor.post_visit(plan)?; + Ok(()) +} + +/// Trait that implements the [Visitor +/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for a +/// depth first walk of `ExecutionPlan` nodes. `pre_visit` is called +/// before any children are visited, and then `post_visit` is called +/// after all children have been visited. +//// +/// To use, define a struct that implements this trait and then invoke +/// ['accept']. +/// +/// For example, for an execution plan that looks like: +/// +/// ```text +/// ProjectionExec: #id +/// FilterExec: state = CO +/// CsvExec: +/// ``` +/// +/// The sequence of visit operations would be: +/// ```text +/// visitor.pre_visit(ProjectionExec) +/// visitor.pre_visit(FilterExec) +/// visitor.pre_visit(CsvExec) +/// visitor.post_visit(CsvExec) +/// visitor.post_visit(FilterExec) +/// visitor.post_visit(ProjectionExec) +/// ``` Review comment: Nice ########## File path: datafusion/src/logical_plan/plan.rs ########## @@ -356,13 +356,15 @@ pub enum Partitioning { /// after all children have been visited. //// /// To use, define a struct that implements this trait and then invoke -/// "LogicalPlan::accept". +/// [`LogicalPlan::accept`]. Review comment: Question: What this change does? better looking in the doc? ########## File path: datafusion/tests/sql.rs ########## @@ -2885,3 +2885,45 @@ async fn test_cast_expressions_error() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_physical_plan_display_indent() { + let mut ctx = ExecutionContext::new(); + register_aggregate_csv(&mut ctx).unwrap(); + let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \ + FROM aggregate_test_100 \ + WHERE c12 < 10 \ + GROUP BY c1 \ + ORDER BY the_min DESC \ + LIMIT 10"; + let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.optimize(&plan).unwrap(); + + let physical_plan = ctx.create_physical_plan(&plan).unwrap(); + let expected = vec![ + "GlobalLimitExec: limit=10", + " SortExec: [the_min DESC]", + " ProjectionExec: expr=[c1, MAX(c12), MIN(c12) as the_min]", + " HashAggregateExec: mode=Final, gby=[c1], aggr=[MAX(c12), MIN(c12)]", + " MergeExec", + " HashAggregateExec: mode=Partial, gby=[c1], aggr=[MAX(c12), MIN(c12)]", Review comment: Great to see the partial aggregate displayed here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org