This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ee2498ff7a chore(15003): add identation to plans, to make easier to 
read (#15007)
ee2498ff7a is described below

commit ee2498ff7ab6202e023e8251c7b290db9200d9a5
Author: wiedld <[email protected]>
AuthorDate: Wed Mar 5 02:47:04 2025 -0800

    chore(15003): add identation to plans, to make easier to read (#15007)
---
 .../physical_optimizer/enforce_distribution.rs     | 1171 ++++++++++----------
 1 file changed, 590 insertions(+), 581 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index fc2394d889..85d826109f 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -19,11 +19,11 @@ use std::fmt::Debug;
 use std::ops::Deref;
 use std::sync::Arc;
 
+use crate::physical_optimizer::test_utils::parquet_exec_with_sort;
 use crate::physical_optimizer::test_utils::{
     check_integrity, coalesce_partitions_exec, repartition_exec, schema,
     sort_merge_join_exec, sort_preserving_merge_exec,
 };
-use crate::physical_optimizer::test_utils::{parquet_exec_with_sort, 
trim_plan_display};
 
 use arrow::compute::SortOptions;
 use datafusion::config::ConfigOptions;
@@ -61,7 +61,9 @@ use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeE
 use datafusion_physical_plan::union::UnionExec;
 use datafusion_physical_plan::ExecutionPlanProperties;
 use datafusion_physical_plan::PlanProperties;
-use datafusion_physical_plan::{displayable, DisplayAs, DisplayFormatType, 
Statistics};
+use datafusion_physical_plan::{
+    get_plan_string, DisplayAs, DisplayFormatType, Statistics,
+};
 
 /// Models operators like BoundedWindowExec that require an input
 /// ordering but is easy to construct
@@ -358,8 +360,7 @@ fn ensure_distribution_helper(
 macro_rules! plans_matches_expected {
     ($EXPECTED_LINES: expr, $PLAN: expr) => {
         let physical_plan = $PLAN;
-        let formatted = 
displayable(physical_plan.as_ref()).indent(true).to_string();
-        let actual: Vec<&str> = formatted.trim().lines().collect();
+        let actual = get_plan_string(&physical_plan);
 
         let expected_plan_lines: Vec<&str> = $EXPECTED_LINES
             .iter().map(|s| *s).collect();
@@ -485,8 +486,7 @@ macro_rules! assert_optimized {
         let optimized = optimizer.optimize(optimized, &config)?;
 
         // Now format correctly
-        let plan = displayable(optimized.as_ref()).indent(true).to_string();
-        let actual_lines = trim_plan_display(&plan);
+        let actual_lines = get_plan_string(&optimized);
 
         assert_eq!(
             &expected_lines, &actual_lines,
@@ -500,8 +500,7 @@ macro_rules! assert_plan_txt {
     ($EXPECTED_LINES: expr, $PLAN: expr) => {
         let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| 
*s).collect();
         // Now format correctly
-        let plan = displayable($PLAN.as_ref()).indent(true).to_string();
-        let actual_lines = trim_plan_display(&plan);
+        let actual_lines = get_plan_string(&$PLAN);
 
         assert_eq!(
             &expected_lines, &actual_lines,
@@ -542,9 +541,11 @@ fn multi_hash_joins() -> Result<()> {
 
     for join_type in join_types {
         let join = hash_join_exec(left.clone(), right.clone(), &join_on, 
&join_type);
-        let join_plan = format!(
-            "HashJoinExec: mode=Partitioned, join_type={join_type}, on=[(a@0, 
b1@1)]"
-        );
+        let join_plan = |shift| -> String {
+            format!("{}HashJoinExec: mode=Partitioned, join_type={join_type}, 
on=[(a@0, b1@1)]", " ".repeat(shift))
+        };
+        let join_plan_indent2 = join_plan(2);
+        let join_plan_indent4 = join_plan(4);
 
         match join_type {
             JoinType::Inner
@@ -572,33 +573,33 @@ fn multi_hash_joins() -> Result<()> {
                     // Should include 3 RepartitionExecs
                     JoinType::Inner | JoinType::Left | JoinType::LeftSemi | 
JoinType::LeftAnti | JoinType::LeftMark => vec![
                         top_join_plan.as_str(),
-                        join_plan.as_str(),
-                        "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                        "RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as 
c1, d@3 as d1, e@4 as e1]",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                        "RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                        &join_plan_indent2,
+                        "    RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
+                        "      RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                        "    RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
+                        "      RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "        ProjectionExec: expr=[a@0 as a1, b@1 as b1, 
c@2 as c1, d@3 as d1, e@4 as e1]",
+                        "          DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                        "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+                        "    RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
                     ],
                     // Should include 4 RepartitionExecs
                     _ => vec![
                         top_join_plan.as_str(),
-                        "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                        join_plan.as_str(),
-                        "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                        "RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as 
c1, d@3 as d1, e@4 as e1]",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                        "RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                        "  RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
+                        &join_plan_indent4,
+                        "      RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
+                        "        RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "          DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                        "      RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
+                        "        RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, 
c@2 as c1, d@3 as d1, e@4 as e1]",
+                        "            DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                        "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+                        "    RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
                     ],
                 };
                 assert_optimized!(expected, top_join.clone(), true);
@@ -635,34 +636,34 @@ fn multi_hash_joins() -> Result<()> {
                     JoinType::Inner | JoinType::Right | JoinType::RightSemi | 
JoinType::RightAnti =>
                         vec![
                             top_join_plan.as_str(),
-                            join_plan.as_str(),
-                            "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                            "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                            "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                            "RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
-                            "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                            "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
-                            "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                            "RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
-                            "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                            "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                            &join_plan_indent2,
+                            "    RepartitionExec: partitioning=Hash([a@0], 
10), input_partitions=10",
+                            "      RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                            "        DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                            "    RepartitionExec: partitioning=Hash([b1@1], 
10), input_partitions=10",
+                            "      RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                            "        ProjectionExec: expr=[a@0 as a1, b@1 as 
b1, c@2 as c1, d@3 as d1, e@4 as e1]",
+                            "          DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                            "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+                            "    RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                            "      DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
                         ],
                     // Should include 4 RepartitionExecs
                     _ =>
                         vec![
                             top_join_plan.as_str(),
-                            "RepartitionExec: partitioning=Hash([b1@6], 10), 
input_partitions=10",
-                            join_plan.as_str(),
-                            "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                            "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                            "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                            "RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
-                            "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                            "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
-                            "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                            "RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
-                            "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                            "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                            "  RepartitionExec: partitioning=Hash([b1@6], 10), 
input_partitions=10",
+                            &join_plan_indent4,
+                            "      RepartitionExec: partitioning=Hash([a@0], 
10), input_partitions=10",
+                            "        RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                            "          DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                            "      RepartitionExec: partitioning=Hash([b1@1], 
10), input_partitions=10",
+                            "        RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                            "          ProjectionExec: expr=[a@0 as a1, b@1 as 
b1, c@2 as c1, d@3 as d1, e@4 as e1]",
+                            "            DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                            "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+                            "    RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                            "      DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
                         ],
                 };
                 assert_optimized!(expected, top_join.clone(), true);
@@ -710,17 +711,17 @@ fn multi_joins_after_alias() -> Result<()> {
     // Output partition need to respect the Alias and should not introduce 
additional RepartitionExec
     let expected = &[
         "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, c@2)]",
-        "ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
-        "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
-        "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
+        "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
+        "      RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "      RepartitionExec: partitioning=Hash([b@1], 10), 
input_partitions=10",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "  RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, top_join.clone(), true);
     assert_optimized!(expected, top_join, false);
@@ -736,17 +737,17 @@ fn multi_joins_after_alias() -> Result<()> {
     // Output partition need to respect the Alias and should not introduce 
additional RepartitionExec
     let expected = &[
         "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a2@1, c@2)]",
-        "ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
-        "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
-        "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
+        "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
+        "      RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "      RepartitionExec: partitioning=Hash([b@1], 10), 
input_partitions=10",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "  RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, top_join.clone(), true);
     assert_optimized!(expected, top_join, false);
@@ -787,19 +788,19 @@ fn multi_joins_after_multi_alias() -> Result<()> {
     // The original Output partition can not satisfy the Join requirements and 
need to add an additional RepartitionExec
     let expected = &[
         "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, c@2)]",
-        "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
-        "ProjectionExec: expr=[c1@0 as a]",
-        "ProjectionExec: expr=[c@2 as c1]",
-        "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
-        "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
+        "    ProjectionExec: expr=[c1@0 as a]",
+        "      ProjectionExec: expr=[c@2 as c1]",
+        "        HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, 
b@1)]",
+        "          RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
+        "            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "              DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "          RepartitionExec: partitioning=Hash([b@1], 10), 
input_partitions=10",
+        "            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "              DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "  RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
 
     assert_optimized!(expected, top_join.clone(), true);
@@ -831,16 +832,16 @@ fn join_after_agg_alias() -> Result<()> {
     // Only two RepartitionExecs added
     let expected = &[
         "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, a2@0)]",
-        "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
+        "    RepartitionExec: partitioning=Hash([a1@0], 10), 
input_partitions=10",
+        "      AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "  AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
+        "    RepartitionExec: partitioning=Hash([a2@0], 10), 
input_partitions=10",
+        "      AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, join.clone(), true);
     assert_optimized!(expected, join, false);
@@ -883,17 +884,17 @@ fn hash_join_key_ordering() -> Result<()> {
     // Only two RepartitionExecs added
     let expected = &[
         "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b1@1, b@0), 
(a1@0, a@1)]",
-        "ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
-        "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], 
aggr=[]",
-        "RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), 
input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], 
aggr=[]",
-        "RepartitionExec: partitioning=Hash([b@0, a@1], 10), 
input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
+        "    AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as 
a1], aggr=[]",
+        "      RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), 
input_partitions=10",
+        "        AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], 
aggr=[]",
+        "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "  AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], 
aggr=[]",
+        "    RepartitionExec: partitioning=Hash([b@0, a@1], 10), 
input_partitions=10",
+        "      AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, join.clone(), true);
     assert_optimized!(expected, join, false);
@@ -1002,24 +1003,24 @@ fn multi_hash_join_key_ordering() -> Result<()> {
     // The bottom joins' join key ordering is adjusted based on the top join. 
And the top join should not introduce additional RepartitionExec
     let expected = &[
         "FilterExec: c@6 > 1",
-        "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(B@2, b1@6), 
(C@3, c@2), (AA@1, a1@5)]",
-        "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
-        "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), 
(c@2, c1@2), (a@0, a1@0)]",
-        "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), 
input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), 
input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), 
(c@2, c1@2), (a@0, a1@0)]",
-        "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), 
input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), 
input_partitions=10",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  HashJoinExec: mode=Partitioned, join_type=Inner, on=[(B@2, b1@6), 
(C@3, c@2), (AA@1, a1@5)]",
+        "    ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
+        "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, 
b1@1), (c@2, c1@2), (a@0, a1@0)]",
+        "        RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), 
input_partitions=10",
+        "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "        RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), 
input_partitions=10",
+        "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "            ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
+        "              DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), 
(c@2, c1@2), (a@0, a1@0)]",
+        "      RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), 
input_partitions=10",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "      RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), 
input_partitions=10",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
+        "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, filter_top_join.clone(), true);
     assert_optimized!(expected, filter_top_join, false);
@@ -1141,23 +1142,23 @@ fn reorder_join_keys_to_left_input() -> Result<()> {
         // The top joins' join key ordering is adjusted based on the children 
inputs.
         let expected = &[
             top_join_plan.as_str(),
-            "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
-            "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), 
(b@1, b1@1), (c@2, c1@2)]",
-            "RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), 
input_partitions=10",
-            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
-            "RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10), 
input_partitions=10",
-            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-            "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
-            "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), 
(b@1, b1@1), (a@0, a1@0)]",
-            "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), 
input_partitions=10",
-            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
-            "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), 
input_partitions=10",
-            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-            "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
+            "  ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
+            "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, 
a1@0), (b@1, b1@1), (c@2, c1@2)]",
+            "      RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), 
input_partitions=10",
+            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+            "      RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10), 
input_partitions=10",
+            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
+            "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+            "  HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, 
c1@2), (b@1, b1@1), (a@0, a1@0)]",
+            "    RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), 
input_partitions=10",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+            "    RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), 
input_partitions=10",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
+            "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
         ];
 
         assert_plan_txt!(expected, reordered);
@@ -1275,23 +1276,23 @@ fn reorder_join_keys_to_right_input() -> Result<()> {
         // The top joins' join key ordering is adjusted based on the children 
inputs.
         let expected = &[
             top_join_plan.as_str(),
-            "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
-            "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), 
(b@1, b1@1)]",
-            "RepartitionExec: partitioning=Hash([a@0, b@1], 10), 
input_partitions=10",
-            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
-            "RepartitionExec: partitioning=Hash([a1@0, b1@1], 10), 
input_partitions=10",
-            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-            "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
-            "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), 
(b@1, b1@1), (a@0, a1@0)]",
-            "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), 
input_partitions=10",
-            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
-            "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), 
input_partitions=10",
-            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-            "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
+            "  ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
+            "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, 
a1@0), (b@1, b1@1)]",
+            "      RepartitionExec: partitioning=Hash([a@0, b@1], 10), 
input_partitions=10",
+            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+            "      RepartitionExec: partitioning=Hash([a1@0, b1@1], 10), 
input_partitions=10",
+            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
+            "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+            "  HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, 
c1@2), (b@1, b1@1), (a@0, a1@0)]",
+            "    RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), 
input_partitions=10",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+            "    RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), 
input_partitions=10",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
+            "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
         ];
 
         assert_plan_txt!(expected, reordered);
@@ -1331,7 +1332,15 @@ fn multi_smj_joins() -> Result<()> {
     for join_type in join_types {
         let join =
             sort_merge_join_exec(left.clone(), right.clone(), &join_on, 
&join_type);
-        let join_plan = format!("SortMergeJoin: join_type={join_type}, 
on=[(a@0, b1@1)]");
+        let join_plan = |shift| -> String {
+            format!(
+                "{}SortMergeJoin: join_type={join_type}, on=[(a@0, b1@1)]",
+                " ".repeat(shift)
+            )
+        };
+        let join_plan_indent2 = join_plan(2);
+        let join_plan_indent6 = join_plan(6);
+        let join_plan_indent10 = join_plan(10);
 
         // Top join on (a == c)
         let top_join_on = vec![(
@@ -1348,20 +1357,20 @@ fn multi_smj_joins() -> Result<()> {
             JoinType::Inner | JoinType::Left | JoinType::LeftSemi | 
JoinType::LeftAnti =>
                 vec![
                     top_join_plan.as_str(),
-                    join_plan.as_str(),
-                    "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
-                    "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                    "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    "SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]",
-                    "RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
-                    "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, 
d@3 as d1, e@4 as e1]",
-                    "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
-                    "RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
-                    "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                    &join_plan_indent2,
+                    "    SortExec: expr=[a@0 ASC], 
preserve_partitioning=[true]",
+                    "      RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
+                    "        RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                    "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                    "    SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[true]",
+                    "      RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
+                    "        RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                    "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
+                    "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                    "  SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
+                    "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+                    "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+                    "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
                 ],
             // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4 
SortExecs
             // Since ordering of the left child is not preserved after 
SortMergeJoin
@@ -1375,22 +1384,22 @@ fn multi_smj_joins() -> Result<()> {
             _ => vec![
                     top_join_plan.as_str(),
                     // Below 2 operators are differences introduced, when join 
mode is changed
-                    "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
-                    "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                    join_plan.as_str(),
-                    "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
-                    "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                    "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    "SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]",
-                    "RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
-                    "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, 
d@3 as d1, e@4 as e1]",
-                    "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
-                    "RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
-                    "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                    "  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+                    "    RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
+                    &join_plan_indent6,
+                    "        SortExec: expr=[a@0 ASC], 
preserve_partitioning=[true]",
+                    "          RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
+                    "            RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                    "              DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                    "        SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[true]",
+                    "          RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
+                    "            RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                    "              ProjectionExec: expr=[a@0 as a1, b@1 as b1, 
c@2 as c1, d@3 as d1, e@4 as e1]",
+                    "                DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                    "  SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
+                    "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+                    "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+                    "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
             ],
         };
         assert_optimized!(expected, top_join.clone(), true, true);
@@ -1400,20 +1409,20 @@ fn multi_smj_joins() -> Result<()> {
             JoinType::Inner | JoinType::Left | JoinType::LeftSemi | 
JoinType::LeftAnti =>
                 vec![
                     top_join_plan.as_str(),
-                    join_plan.as_str(),
-                    "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
-                    "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
-                    "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    "RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
-                    "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]",
-                    "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, 
d@3 as d1, e@4 as e1]",
-                    "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    "RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
-                    "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
-                    "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                    &join_plan_indent2,
+                    "    RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
+                    "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+                    "        SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]",
+                    "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                    "    RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
+                    "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+                    "        SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[false]",
+                    "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
+                    "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                    "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
+                    "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+                    "      SortExec: expr=[c@2 ASC], 
preserve_partitioning=[false]",
+                    "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
                 ],
             // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4 
SortExecs
             // Since ordering of the left child is not preserved after 
SortMergeJoin
@@ -1427,24 +1436,24 @@ fn multi_smj_joins() -> Result<()> {
             _ => vec![
                 top_join_plan.as_str(),
                 // Below 4 operators are differences introduced, when join 
mode is changed
-                "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
-                "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
-                "CoalescePartitionsExec",
-                join_plan.as_str(),
-                "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
-                "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
-                "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
-                "RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
-                "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                "SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]",
-                "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as 
d1, e@4 as e1]",
-                "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
-                "RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
-                "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
-                "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
+                "  RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
+                "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+                "      SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]",
+                "        CoalescePartitionsExec",
+                &join_plan_indent10,
+                "            RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
+                "              RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                "                SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]",
+                "                  DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                "            RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
+                "              RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                "                SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[false]",
+                "                  ProjectionExec: expr=[a@0 as a1, b@1 as b1, 
c@2 as c1, d@3 as d1, e@4 as e1]",
+                "                    DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
+                "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+                "      SortExec: expr=[c@2 ASC], 
preserve_partitioning=[false]",
+                "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
             ],
         };
         assert_optimized!(expected_first_sort_enforcement, top_join, false, 
true);
@@ -1466,40 +1475,40 @@ fn multi_smj_joins() -> Result<()> {
                     // Should include 6 RepartitionExecs(3 hash, 3 
round-robin) and 3 SortExecs
                     JoinType::Inner | JoinType::Right => vec![
                         top_join_plan.as_str(),
-                        join_plan.as_str(),
-                        "SortExec: expr=[a@0 ASC], 
preserve_partitioning=[true]",
-                        "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                        "SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[true]",
-                        "RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as 
c1, d@3 as d1, e@4 as e1]",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                        "SortExec: expr=[c@2 ASC], 
preserve_partitioning=[true]",
-                        "RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                        &join_plan_indent2,
+                        "    SortExec: expr=[a@0 ASC], 
preserve_partitioning=[true]",
+                        "      RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
+                        "        RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "          DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                        "    SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[true]",
+                        "      RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
+                        "        RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, 
c@2 as c1, d@3 as d1, e@4 as e1]",
+                        "            DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                        "  SortExec: expr=[c@2 ASC], 
preserve_partitioning=[true]",
+                        "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+                        "      RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
                     ],
                     // Should include 7 RepartitionExecs (4 hash, 3 
round-robin) and 4 SortExecs
                     JoinType::Left | JoinType::Full => vec![
                         top_join_plan.as_str(),
-                        "SortExec: expr=[b1@6 ASC], 
preserve_partitioning=[true]",
-                        "RepartitionExec: partitioning=Hash([b1@6], 10), 
input_partitions=10",
-                        join_plan.as_str(),
-                        "SortExec: expr=[a@0 ASC], 
preserve_partitioning=[true]",
-                        "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                        "SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[true]",
-                        "RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as 
c1, d@3 as d1, e@4 as e1]",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                        "SortExec: expr=[c@2 ASC], 
preserve_partitioning=[true]",
-                        "RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                        "  SortExec: expr=[b1@6 ASC], 
preserve_partitioning=[true]",
+                        "    RepartitionExec: partitioning=Hash([b1@6], 10), 
input_partitions=10",
+                        &join_plan_indent6,
+                        "        SortExec: expr=[a@0 ASC], 
preserve_partitioning=[true]",
+                        "          RepartitionExec: partitioning=Hash([a@0], 
10), input_partitions=10",
+                        "            RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "              DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                        "        SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[true]",
+                        "          RepartitionExec: partitioning=Hash([b1@1], 
10), input_partitions=10",
+                        "            RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "              ProjectionExec: expr=[a@0 as a1, b@1 as 
b1, c@2 as c1, d@3 as d1, e@4 as e1]",
+                        "                DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                        "  SortExec: expr=[c@2 ASC], 
preserve_partitioning=[true]",
+                        "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+                        "      RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
                     ],
                     // this match arm cannot be reached
                     _ => unreachable!()
@@ -1510,42 +1519,42 @@ fn multi_smj_joins() -> Result<()> {
                     // Should include 6 RepartitionExecs (3 of them preserves 
order) and 3 SortExecs
                     JoinType::Inner | JoinType::Right => vec![
                         top_join_plan.as_str(),
-                        join_plan.as_str(),
-                        "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                        "RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[false]",
-                        "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as 
c1, d@3 as d1, e@4 as e1]",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                        "RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "SortExec: expr=[c@2 ASC], 
preserve_partitioning=[false]",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                        &join_plan_indent2,
+                        "    RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
+                        "      RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "        SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]",
+                        "          DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                        "    RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
+                        "      RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "        SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[false]",
+                        "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, 
c@2 as c1, d@3 as d1, e@4 as e1]",
+                        "            DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                        "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
+                        "    RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "      SortExec: expr=[c@2 ASC], 
preserve_partitioning=[false]",
+                        "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
                     ],
                     // Should include 8 RepartitionExecs (4 of them preserves 
order) and 4 SortExecs
                     JoinType::Left | JoinType::Full => vec![
                         top_join_plan.as_str(),
-                        "RepartitionExec: partitioning=Hash([b1@6], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "SortExec: expr=[b1@6 ASC], 
preserve_partitioning=[false]",
-                        "CoalescePartitionsExec",
-                        join_plan.as_str(),
-                        "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                        "RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[false]",
-                        "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as 
c1, d@3 as d1, e@4 as e1]",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                        "RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
-                        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                        "SortExec: expr=[c@2 ASC], 
preserve_partitioning=[false]",
-                        "DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+                        "  RepartitionExec: partitioning=Hash([b1@6], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC",
+                        "    RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "      SortExec: expr=[b1@6 ASC], 
preserve_partitioning=[false]",
+                        "        CoalescePartitionsExec",
+                        &join_plan_indent10,
+                        "            RepartitionExec: partitioning=Hash([a@0], 
10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
+                        "              RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "                SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]",
+                        "                  DataSourceExec: file_groups={1 
group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                        "            RepartitionExec: 
partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, 
sort_exprs=b1@1 ASC",
+                        "              RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "                SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[false]",
+                        "                  ProjectionExec: expr=[a@0 as a1, 
b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
+                        "                    DataSourceExec: file_groups={1 
group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+                        "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
+                        "    RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+                        "      SortExec: expr=[c@2 ASC], 
preserve_partitioning=[false]",
+                        "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
                     ],
                     // this match arm cannot be reached
                     _ => unreachable!()
@@ -1608,47 +1617,47 @@ fn smj_join_key_ordering() -> Result<()> {
     // Only two RepartitionExecs added
     let expected = &[
         "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
-        "SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[true]",
-        "ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
-        "ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
-        "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], 
aggr=[]",
-        "RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), 
input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[true]",
-        "ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
-        "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], 
aggr=[]",
-        "RepartitionExec: partitioning=Hash([b@0, a@1], 10), 
input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[true]",
+        "    ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
+        "      ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
+        "        AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 
as a1], aggr=[]",
+        "          RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), 
input_partitions=10",
+        "            AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], 
aggr=[]",
+        "              RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "                DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "  SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[true]",
+        "    ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
+        "      AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], 
aggr=[]",
+        "        RepartitionExec: partitioning=Hash([b@0, a@1], 10), 
input_partitions=10",
+        "          AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], 
aggr=[]",
+        "            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "              DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, join.clone(), true, true);
 
     let expected_first_sort_enforcement = &[
         "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
-        "RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC, a3@0 ASC",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[false]",
-        "CoalescePartitionsExec",
-        "ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
-        "ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
-        "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], 
aggr=[]",
-        "RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), 
input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b2@1 ASC, a2@0 ASC",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[false]",
-        "CoalescePartitionsExec",
-        "ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
-        "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], 
aggr=[]",
-        "RepartitionExec: partitioning=Hash([b@0, a@1], 10), 
input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC, a3@0 ASC",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "      SortExec: expr=[b3@1 ASC, a3@0 ASC], 
preserve_partitioning=[false]",
+        "        CoalescePartitionsExec",
+        "          ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
+        "            ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
+        "              AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, 
a1@1 as a1], aggr=[]",
+        "                RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), 
input_partitions=10",
+        "                  AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as 
a1], aggr=[]",
+        "                    RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
+        "                      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "  RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b2@1 ASC, a2@0 ASC",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "      SortExec: expr=[b2@1 ASC, a2@0 ASC], 
preserve_partitioning=[false]",
+        "        CoalescePartitionsExec",
+        "          ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
+        "            AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 
as a], aggr=[]",
+        "              RepartitionExec: partitioning=Hash([b@0, a@1], 10), 
input_partitions=10",
+        "                AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as 
a], aggr=[]",
+        "                  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "                    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected_first_sort_enforcement, join, false, true);
 
@@ -1678,8 +1687,8 @@ fn merge_does_not_need_sort() -> Result<()> {
     // data is already sorted
     let expected = &[
         "SortPreservingMergeExec: [a@0 ASC]",
-        "CoalesceBatchesExec: target_batch_size=4096",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
+        "  CoalesceBatchesExec: target_batch_size=4096",
+        "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
     assert_optimized!(expected, exec, true);
 
@@ -1689,9 +1698,9 @@ fn merge_does_not_need_sort() -> Result<()> {
     // SortExec at the top.
     let expected = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
-        "CoalescePartitionsExec",
-        "CoalesceBatchesExec: target_batch_size=4096",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
+        "  CoalescePartitionsExec",
+        "    CoalesceBatchesExec: target_batch_size=4096",
+        "      DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
     assert_optimized!(expected, exec, false);
 
@@ -1721,18 +1730,18 @@ fn union_to_interleave() -> Result<()> {
     // Only two RepartitionExecs added, no final RepartitionExec required
     let expected = &[
         "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
-        "AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
-        "InterleaveExec",
-        "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
+        "    InterleaveExec",
+        "      AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], 
aggr=[]",
+        "        RepartitionExec: partitioning=Hash([a1@0], 10), 
input_partitions=10",
+        "          AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+        "            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "              DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "      AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], 
aggr=[]",
+        "        RepartitionExec: partitioning=Hash([a1@0], 10), 
input_partitions=10",
+        "          AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+        "            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "              DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, plan.clone(), true);
     assert_optimized!(expected, plan.clone(), false);
@@ -1763,19 +1772,19 @@ fn union_not_to_interleave() -> Result<()> {
     // Only two RepartitionExecs added, no final RepartitionExec required
     let expected = &[
         "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=20",
-        "AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
-        "UnionExec",
-        "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  RepartitionExec: partitioning=Hash([a2@0], 10), 
input_partitions=20",
+        "    AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
+        "      UnionExec",
+        "        AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], 
aggr=[]",
+        "          RepartitionExec: partitioning=Hash([a1@0], 10), 
input_partitions=10",
+        "            AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+        "              RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "                DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "        AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], 
aggr=[]",
+        "          RepartitionExec: partitioning=Hash([a1@0], 10), 
input_partitions=10",
+        "            AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+        "              RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "                DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
     // no sort in the plan but since we need it as a parameter, make it 
default false
     let prefer_existing_sort = false;
@@ -1807,10 +1816,10 @@ fn added_repartition_to_single_partition() -> 
Result<()> {
 
     let expected = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, plan.clone(), true);
     assert_optimized!(expected, plan, false);
@@ -1825,11 +1834,11 @@ fn repartition_deepest_node() -> Result<()> {
 
     let expected = &[
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "      FilterExec: c@2 = 0",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, plan.clone(), true);
     assert_optimized!(expected, plan, false);
@@ -1844,12 +1853,12 @@ fn repartition_unsorted_limit() -> Result<()> {
 
     let expected = &[
         "GlobalLimitExec: skip=0, fetch=100",
-        "CoalescePartitionsExec",
-        "LocalLimitExec: fetch=100",
-        "FilterExec: c@2 = 0",
+        "  CoalescePartitionsExec",
+        "    LocalLimitExec: fetch=100",
+        "      FilterExec: c@2 = 0",
         // nothing sorts the data, so the local limit doesn't require sorted 
data either
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
 
     assert_optimized!(expected, plan.clone(), true);
@@ -1869,10 +1878,10 @@ fn repartition_sorted_limit() -> Result<()> {
 
     let expected = &[
         "GlobalLimitExec: skip=0, fetch=100",
-        "LocalLimitExec: fetch=100",
+        "  LocalLimitExec: fetch=100",
         // data is sorted so can't repartition here
-        "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "    SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, plan.clone(), true);
     assert_optimized!(expected, plan, false);
@@ -1894,12 +1903,12 @@ fn repartition_sorted_limit_with_filter() -> Result<()> 
{
 
     let expected = &[
         "SortRequiredExec: [c@2 ASC]",
-        "FilterExec: c@2 = 0",
+        "  FilterExec: c@2 = 0",
         // We can use repartition here, ordering requirement by 
SortRequiredExec
         // is still satisfied.
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "      SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+        "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
 
     assert_optimized!(expected, plan.clone(), true);
@@ -1918,19 +1927,19 @@ fn repartition_ignores_limit() -> Result<()> {
 
     let expected = &[
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "GlobalLimitExec: skip=0, fetch=100",
-        "CoalescePartitionsExec",
-        "LocalLimitExec: fetch=100",
-        "FilterExec: c@2 = 0",
+        "  RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "        GlobalLimitExec: skip=0, fetch=100",
+        "          CoalescePartitionsExec",
+        "            LocalLimitExec: fetch=100",
+        "              FilterExec: c@2 = 0",
         // repartition should happen prior to the filter to maximize 
parallelism
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "GlobalLimitExec: skip=0, fetch=100",
-        "LocalLimitExec: fetch=100",
+        "                RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "                  GlobalLimitExec: skip=0, fetch=100",
+        "                    LocalLimitExec: fetch=100",
         // Expect no repartition to happen for local limit
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "                      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, plan.clone(), true);
     assert_optimized!(expected, plan, false);
@@ -1945,11 +1954,11 @@ fn repartition_ignores_union() -> Result<()> {
     let expected = &[
         "UnionExec",
         // Expect no repartition of DataSourceExec
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
     ];
 
     assert_optimized!(expected, plan.clone(), true);
@@ -1971,7 +1980,7 @@ fn repartition_through_sort_preserving_merge() -> 
Result<()> {
     // need resort as the data was not sorted correctly
     let expected = &[
         "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
     ];
     assert_optimized!(expected, plan.clone(), true);
     assert_optimized!(expected, plan, false);
@@ -1996,15 +2005,15 @@ fn repartition_ignores_sort_preserving_merge() -> 
Result<()> {
     // should not repartition, since increased parallelism is not beneficial 
for SortPReservingMerge
     let expected = &[
         "SortPreservingMergeExec: [c@2 ASC]",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
 
     assert_optimized!(expected, plan.clone(), true);
 
     let expected = &[
         "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
-        "CoalescePartitionsExec",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  CoalescePartitionsExec",
+        "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     assert_optimized!(expected, plan, false);
 
@@ -2025,19 +2034,19 @@ fn 
repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
     // should not repartition / sort (as the data was already sorted)
     let expected = &[
         "SortPreservingMergeExec: [c@2 ASC]",
-        "UnionExec",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=parquet",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  UnionExec",
+        "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
 
     assert_optimized!(expected, plan.clone(), true);
 
     let expected = &[
         "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
-        "CoalescePartitionsExec",
-        "UnionExec",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=parquet",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  CoalescePartitionsExec",
+        "    UnionExec",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     assert_optimized!(expected, plan, false);
 
@@ -2061,9 +2070,9 @@ fn repartition_does_not_destroy_sort() -> Result<()> {
     // during repartitioning ordering is preserved
     let expected = &[
         "SortRequiredExec: [d@3 ASC]",
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[d@3 ASC], file_type=parquet",
+        "  FilterExec: c@2 = 0",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
     ];
 
     assert_optimized!(expected, plan.clone(), true, true);
@@ -2100,12 +2109,12 @@ fn repartition_does_not_destroy_sort_more_complex() -> 
Result<()> {
     let expected = &[
         "UnionExec",
         // union input 1: no repartitioning
-        "SortRequiredExec: [c@2 ASC]",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  SortRequiredExec: [c@2 ASC]",
+        "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
         // union input 2: should repartition
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  FilterExec: c@2 = 0",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, plan.clone(), true);
     assert_optimized!(expected, plan, false);
@@ -2134,22 +2143,22 @@ fn repartition_transitively_with_projection() -> 
Result<()> {
 
     let expected = &[
         "SortPreservingMergeExec: [sum@0 ASC]",
-        "SortExec: expr=[sum@0 ASC], preserve_partitioning=[true]",
+        "  SortExec: expr=[sum@0 ASC], preserve_partitioning=[true]",
         // Since this projection is not trivial, increasing parallelism is 
beneficial
-        "ProjectionExec: expr=[a@0 + b@1 as sum]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "    ProjectionExec: expr=[a@0 + b@1 as sum]",
+        "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
 
     assert_optimized!(expected, plan.clone(), true);
 
     let expected_first_sort_enforcement = &[
         "SortExec: expr=[sum@0 ASC], preserve_partitioning=[false]",
-        "CoalescePartitionsExec",
+        "  CoalescePartitionsExec",
         // Since this projection is not trivial, increasing parallelism is 
beneficial
-        "ProjectionExec: expr=[a@0 + b@1 as sum]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "    ProjectionExec: expr=[a@0 + b@1 as sum]",
+        "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected_first_sort_enforcement, plan, false);
 
@@ -2180,8 +2189,8 @@ fn repartition_ignores_transitively_with_projection() -> 
Result<()> {
     let expected = &[
         "SortRequiredExec: [c@2 ASC]",
         // Since this projection is trivial, increasing parallelism is not 
beneficial
-        "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
+        "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     assert_optimized!(expected, plan.clone(), true);
     assert_optimized!(expected, plan, false);
@@ -2213,8 +2222,8 @@ fn repartition_transitively_past_sort_with_projection() 
-> Result<()> {
     let expected = &[
         "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
         // Since this projection is trivial, increasing parallelism is not 
beneficial
-        "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
+        "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, plan.clone(), true);
     assert_optimized!(expected, plan, false);
@@ -2233,22 +2242,22 @@ fn repartition_transitively_past_sort_with_filter() -> 
Result<()> {
 
     let expected = &[
         "SortPreservingMergeExec: [a@0 ASC]",
-        "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+        "  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
         // Expect repartition on the input to the sort (as it can benefit from 
additional parallelism)
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "    FilterExec: c@2 = 0",
+        "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
 
     assert_optimized!(expected, plan.clone(), true);
 
     let expected_first_sort_enforcement = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
-        "CoalescePartitionsExec",
-        "FilterExec: c@2 = 0",
+        "  CoalescePartitionsExec",
+        "    FilterExec: c@2 = 0",
         // Expect repartition on the input of the filter (as it can benefit 
from additional parallelism)
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected_first_sort_enforcement, plan, false);
 
@@ -2279,23 +2288,23 @@ fn 
repartition_transitively_past_sort_with_projection_and_filter() -> Result<()>
     let expected = &[
         "SortPreservingMergeExec: [a@0 ASC]",
         // Expect repartition on the input to the sort (as it can benefit from 
additional parallelism)
-        "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
-        "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
-        "FilterExec: c@2 = 0",
+        "  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+        "    ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
+        "      FilterExec: c@2 = 0",
         // repartition is lowest down
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
 
     assert_optimized!(expected, plan.clone(), true);
 
     let expected_first_sort_enforcement = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
-        "CoalescePartitionsExec",
-        "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  CoalescePartitionsExec",
+        "    ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
+        "      FilterExec: c@2 = 0",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected_first_sort_enforcement, plan, false);
 
@@ -2310,15 +2319,15 @@ fn parallelization_single_partition() -> Result<()> {
 
     let expected_parquet = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "      DataSourceExec: file_groups={2 groups: [[x:0..50], 
[x:50..100]]}, projection=[a, b, c, d, e], file_type=parquet",
     ];
     let expected_csv = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, 
projection=[a, b, c, d, e], file_type=csv, has_header=false",
+        "  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "      DataSourceExec: file_groups={2 groups: [[x:0..50], 
[x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
     assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 
10);
     assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10);
@@ -2342,8 +2351,8 @@ fn parallelization_multiple_files() -> Result<()> {
     // https://github.com/apache/datafusion/issues/8451
     let expected = [
         "SortRequiredExec: [a@0 ASC]",
-        "FilterExec: c@2 = 0",
-        "DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100], 
[x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], 
file_type=parquet",        ];
+        "  FilterExec: c@2 = 0",
+        "    DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100], 
[x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], 
file_type=parquet",        ];
     let target_partitions = 3;
     let repartition_size = 1;
     assert_optimized!(
@@ -2359,8 +2368,8 @@ fn parallelization_multiple_files() -> Result<()> {
 
     let expected = [
         "SortRequiredExec: [a@0 ASC]",
-        "FilterExec: c@2 = 0",
-        "DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25], 
[x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
+        "  FilterExec: c@2 = 0",
+        "    DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25], 
[x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
     let target_partitions = 8;
     let repartition_size = 1;
@@ -2392,17 +2401,17 @@ fn parallelization_compressed_csv() -> Result<()> {
 
     let expected_not_partitioned = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false",
+        "  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "      RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=csv, has_header=false",
     ];
 
     let expected_partitioned = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, 
projection=[a, b, c, d, e], file_type=csv, has_header=false",
+        "  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "      DataSourceExec: file_groups={2 groups: [[x:0..50], 
[x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
 
     for compression_type in compression_types {
@@ -2436,17 +2445,17 @@ fn parallelization_two_partitions() -> Result<()> {
 
     let expected_parquet = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
         // Plan already has two partitions
-        "DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        "      DataSourceExec: file_groups={2 groups: [[x:0..100], 
[y:0..100]]}, projection=[a, b, c, d, e], file_type=parquet",
     ];
     let expected_csv = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
         // Plan already has two partitions
-        "DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, 
projection=[a, b, c, d, e], file_type=csv, has_header=false",
+        "      DataSourceExec: file_groups={2 groups: [[x:0..100], 
[y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
     assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 
10);
     assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10);
@@ -2461,17 +2470,17 @@ fn parallelization_two_partitions_into_four() -> 
Result<()> {
 
     let expected_parquet = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "  RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
         // Multiple source files splitted across partitions
-        "DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], 
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=parquet",
+        "      DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], 
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=parquet",
     ];
     let expected_csv = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "  RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
         // Multiple source files splitted across partitions
-        "DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], 
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv, 
has_header=false",
+        "      DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], 
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv, 
has_header=false",
     ];
     assert_optimized!(expected_parquet, plan_parquet, true, false, 4, true, 
10);
     assert_optimized!(expected_csv, plan_csv, true, false, 4, true, 10);
@@ -2491,19 +2500,19 @@ fn parallelization_sorted_limit() -> Result<()> {
 
     let expected_parquet = &[
         "GlobalLimitExec: skip=0, fetch=100",
-        "LocalLimitExec: fetch=100",
+        "  LocalLimitExec: fetch=100",
         // data is sorted so can't repartition here
-        "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+        "    SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
         // Doesn't parallelize for SortExec without preserve_partitioning
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
     let expected_csv = &[
         "GlobalLimitExec: skip=0, fetch=100",
-        "LocalLimitExec: fetch=100",
+        "  LocalLimitExec: fetch=100",
         // data is sorted so can't repartition here
-        "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+        "    SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
         // Doesn't parallelize for SortExec without preserve_partitioning
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=csv, has_header=false",
     ];
     assert_optimized!(expected_parquet, plan_parquet, true);
     assert_optimized!(expected_csv, plan_csv, true);
@@ -2527,27 +2536,27 @@ fn parallelization_limit_with_filter() -> Result<()> {
 
     let expected_parquet = &[
         "GlobalLimitExec: skip=0, fetch=100",
-        "CoalescePartitionsExec",
-        "LocalLimitExec: fetch=100",
-        "FilterExec: c@2 = 0",
+        "  CoalescePartitionsExec",
+        "    LocalLimitExec: fetch=100",
+        "      FilterExec: c@2 = 0",
         // even though data is sorted, we can use repartition here. Since
         // ordering is not used in subsequent stages anyway.
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
         // SortExec doesn't benefit from input partitioning
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
     let expected_csv = &[
         "GlobalLimitExec: skip=0, fetch=100",
-        "CoalescePartitionsExec",
-        "LocalLimitExec: fetch=100",
-        "FilterExec: c@2 = 0",
+        "  CoalescePartitionsExec",
+        "    LocalLimitExec: fetch=100",
+        "      FilterExec: c@2 = 0",
         // even though data is sorted, we can use repartition here. Since
         // ordering is not used in subsequent stages anyway.
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "          SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
         // SortExec doesn't benefit from input partitioning
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false",
+        "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
     assert_optimized!(expected_parquet, plan_parquet, true);
     assert_optimized!(expected_csv, plan_csv, true);
@@ -2567,35 +2576,35 @@ fn parallelization_ignores_limit() -> Result<()> {
 
     let expected_parquet = &[
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "GlobalLimitExec: skip=0, fetch=100",
-        "CoalescePartitionsExec",
-        "LocalLimitExec: fetch=100",
-        "FilterExec: c@2 = 0",
+        "  RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "        GlobalLimitExec: skip=0, fetch=100",
+        "          CoalescePartitionsExec",
+        "            LocalLimitExec: fetch=100",
+        "              FilterExec: c@2 = 0",
         // repartition should happen prior to the filter to maximize 
parallelism
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "GlobalLimitExec: skip=0, fetch=100",
+        "                RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "                  GlobalLimitExec: skip=0, fetch=100",
         // Limit doesn't benefit from input partitioning - no parallelism
-        "LocalLimitExec: fetch=100",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "                    LocalLimitExec: fetch=100",
+        "                      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
     let expected_csv = &[
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "GlobalLimitExec: skip=0, fetch=100",
-        "CoalescePartitionsExec",
-        "LocalLimitExec: fetch=100",
-        "FilterExec: c@2 = 0",
+        "  RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
+        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "        GlobalLimitExec: skip=0, fetch=100",
+        "          CoalescePartitionsExec",
+        "            LocalLimitExec: fetch=100",
+        "              FilterExec: c@2 = 0",
         // repartition should happen prior to the filter to maximize 
parallelism
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "GlobalLimitExec: skip=0, fetch=100",
+        "                RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "                  GlobalLimitExec: skip=0, fetch=100",
         // Limit doesn't benefit from input partitioning - no parallelism
-        "LocalLimitExec: fetch=100",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false",
+        "                    LocalLimitExec: fetch=100",
+        "                      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
     assert_optimized!(expected_parquet, plan_parquet, true);
     assert_optimized!(expected_csv, plan_csv, true);
@@ -2611,20 +2620,20 @@ fn parallelization_union_inputs() -> Result<()> {
     let expected_parquet = &[
         "UnionExec",
         // Union doesn't benefit from input partitioning - no parallelism
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
     ];
     let expected_csv = &[
         "UnionExec",
         // Union doesn't benefit from input partitioning - no parallelism
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=csv, has_header=false",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=csv, has_header=false",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=csv, has_header=false",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=csv, has_header=false",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=csv, has_header=false",
     ];
     assert_optimized!(expected_parquet, plan_parquet, true);
     assert_optimized!(expected_csv, plan_csv, true);
@@ -2678,15 +2687,15 @@ fn parallelization_sort_preserving_merge_with_union() 
-> Result<()> {
     // should not sort (as the data was already sorted)
     let expected_parquet = &[
         "SortPreservingMergeExec: [c@2 ASC]",
-        "UnionExec",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=parquet",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  UnionExec",
+        "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     let expected_csv = &[
         "SortPreservingMergeExec: [c@2 ASC]",
-        "UnionExec",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
+        "  UnionExec",
+        "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
+        "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
     ];
     assert_optimized!(expected_parquet, plan_parquet, true);
     assert_optimized!(expected_csv, plan_csv, true);
@@ -2713,11 +2722,11 @@ fn parallelization_does_not_benefit() -> Result<()> {
     // no parallelization, because SortRequiredExec doesn't benefit from 
increased parallelism
     let expected_parquet = &[
         "SortRequiredExec: [c@2 ASC]",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     let expected_csv = &[
         "SortRequiredExec: [c@2 ASC]",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
     ];
     assert_optimized!(expected_parquet, plan_parquet, true);
     assert_optimized!(expected_csv, plan_csv, true);
@@ -2757,7 +2766,7 @@ fn 
parallelization_ignores_transitively_with_projection_parquet() -> Result<()>
     // data should not be repartitioned / resorted
     let expected_parquet = &[
         "ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     assert_optimized!(expected_parquet, plan_parquet, true);
 
@@ -2796,7 +2805,7 @@ fn 
parallelization_ignores_transitively_with_projection_csv() -> Result<()> {
     // data should not be repartitioned / resorted
     let expected_csv = &[
         "ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
     ];
     assert_optimized!(expected_csv, plan_csv, true);
 
@@ -2819,8 +2828,8 @@ fn remove_redundant_roundrobins() -> Result<()> {
 
     let expected = &[
         "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, physical_plan.clone(), true);
     assert_optimized!(expected, physical_plan, false);
@@ -2842,9 +2851,9 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> {
     // This is still satisfied since, after filter that column is constant.
     let expected = &[
         "CoalescePartitionsExec",
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  FilterExec: c@2 = 0",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC",
+        "      DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     // last flag sets config.optimizer.PREFER_EXISTING_SORT
     assert_optimized!(expected, physical_plan.clone(), true, true);
@@ -2865,9 +2874,9 @@ fn preserve_ordering_through_repartition() -> Result<()> {
 
     let expected = &[
         "SortPreservingMergeExec: [d@3 ASC]",
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
+        "  FilterExec: c@2 = 0",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC",
+        "      DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
     ];
     // last flag sets config.optimizer.PREFER_EXISTING_SORT
     assert_optimized!(expected, physical_plan.clone(), true, true);
@@ -2888,20 +2897,20 @@ fn do_not_preserve_ordering_through_repartition() -> 
Result<()> {
 
     let expected = &[
         "SortPreservingMergeExec: [a@0 ASC]",
-        "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
+        "  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+        "    FilterExec: c@2 = 0",
+        "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
+        "        DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
 
     assert_optimized!(expected, physical_plan.clone(), true);
 
     let expected = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
-        "CoalescePartitionsExec",
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
+        "  CoalescePartitionsExec",
+        "    FilterExec: c@2 = 0",
+        "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
+        "        DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
     assert_optimized!(expected, physical_plan, false);
 
@@ -2922,9 +2931,9 @@ fn no_need_for_sort_after_filter() -> Result<()> {
         // After CoalescePartitionsExec c is still constant. Hence c@2 ASC 
ordering is already satisfied.
         "CoalescePartitionsExec",
         // Since after this stage c is constant. c@2 ASC ordering is already 
satisfied.
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  FilterExec: c@2 = 0",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
+        "      DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     assert_optimized!(expected, physical_plan.clone(), true);
     assert_optimized!(expected, physical_plan, false);
@@ -2949,21 +2958,21 @@ fn do_not_preserve_ordering_through_repartition2() -> 
Result<()> {
 
     let expected = &[
         "SortPreservingMergeExec: [a@0 ASC]",
-        "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+        "    FilterExec: c@2 = 0",
+        "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
+        "        DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
 
     assert_optimized!(expected, physical_plan.clone(), true);
 
     let expected = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
-        "CoalescePartitionsExec",
-        "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  CoalescePartitionsExec",
+        "    SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+        "      FilterExec: c@2 = 0",
+        "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
+        "          DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     assert_optimized!(expected, physical_plan, false);
 
@@ -2982,8 +2991,8 @@ fn do_not_preserve_ordering_through_repartition3() -> 
Result<()> {
 
     let expected = &[
         "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
+        "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     assert_optimized!(expected, physical_plan.clone(), true);
     assert_optimized!(expected, physical_plan, false);
@@ -3004,8 +3013,8 @@ fn do_not_put_sort_when_input_is_invalid() -> Result<()> {
         // Ordering requirement of sort required exec is NOT satisfied
         // by existing ordering at the source.
         "SortRequiredExec: [a@0 ASC]",
-        "FilterExec: c@2 = 0",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  FilterExec: c@2 = 0",
+        "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
     assert_plan_txt!(expected, physical_plan);
 
@@ -3013,9 +3022,9 @@ fn do_not_put_sort_when_input_is_invalid() -> Result<()> {
         "SortRequiredExec: [a@0 ASC]",
         // Since at the start of the rule ordering requirement is not satisfied
         // EnforceDistribution rule doesn't satisfy this requirement either.
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  FilterExec: c@2 = 0",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
 
     let mut config = ConfigOptions::new();
@@ -3042,8 +3051,8 @@ fn put_sort_when_input_is_valid() -> Result<()> {
         // Ordering requirement of sort required exec is satisfied
         // by existing ordering at the source.
         "SortRequiredExec: [a@0 ASC]",
-        "FilterExec: c@2 = 0",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
+        "  FilterExec: c@2 = 0",
+        "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
     assert_plan_txt!(expected, physical_plan);
 
@@ -3051,8 +3060,8 @@ fn put_sort_when_input_is_valid() -> Result<()> {
         // Since at the start of the rule ordering requirement is satisfied
         // EnforceDistribution rule satisfy this requirement also.
         "SortRequiredExec: [a@0 ASC]",
-        "FilterExec: c@2 = 0",
-        "DataSourceExec: file_groups={10 groups: [[x:0..20], [y:0..20], 
[x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80], 
[x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 
ASC], file_type=parquet",
+        "  FilterExec: c@2 = 0",
+        "    DataSourceExec: file_groups={10 groups: [[x:0..20], [y:0..20], 
[x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80], 
[x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 
ASC], file_type=parquet",
     ];
 
     let mut config = ConfigOptions::new();
@@ -3078,8 +3087,8 @@ fn do_not_add_unnecessary_hash() -> Result<()> {
 
     let expected = &[
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     // Make sure target partition number is 1. In this case hash repartition 
is unnecessary
     assert_optimized!(expected, physical_plan.clone(), true, false, 1, false, 
1024);
@@ -3104,12 +3113,12 @@ fn do_not_add_unnecessary_hash2() -> Result<()> {
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
         // Since hash requirements of this operator is satisfied. There 
shouldn't be
         // a hash repartition here
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
-        "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2",
-        "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "  AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "    AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
+        "      RepartitionExec: partitioning=Hash([a@0], 4), 
input_partitions=4",
+        "        AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+        "          RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=2",
+        "            DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     // Make sure target partition number is larger than 2 (e.g partition 
number at the source).
     assert_optimized!(expected, physical_plan.clone(), true, false, 4, false, 
1024);
@@ -3153,9 +3162,9 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> 
{
 
     let expected = &[
         "FilterExec: c@2 = 0",
-        "FilterExec: c@2 = 0",
-        "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-        "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet",
+        "  FilterExec: c@2 = 0",
+        "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
     assert_optimized!(expected, physical_plan.clone(), true);
     assert_optimized!(expected, physical_plan, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to