This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c177d18dc Use shorter aliases in CSE (#10939)
0c177d18dc is described below

commit 0c177d18dc57fe81a7a23e1b1a41991dc88bb4f6
Author: Peter Toth <[email protected]>
AuthorDate: Tue Jun 18 12:25:28 2024 +0200

    Use shorter aliases in CSE (#10939)
    
    * initial change
    
    * test renaming
    
    * use counter instead of indexmap
    
    * order slt tests
    
    * change cse tests
    
    * restore slt tests
    
    * fix slt test
    
    * formatting
    
    * ensure no alias collision
    
    * keep original alias numbers for collision
    
    * ensure no collision in aggregate cse
    
    * use `AliasGenerator` to generate aliases, use `__cse` prefix in common 
expression aliases, remove `DataType` from `ExprStats` as not needed, store 
aliases in `CommonExprs`, revert unnecessary changes
    
    * use `into_values()` instead of `into_iter()` where possible
    
    * fix docstring of `ExprStats` and `CommonExprs`
    
    * use `__common_expr` prefix
    
    ---------
    
    Co-authored-by: Mohamed Abdeen <[email protected]>
---
 .../optimizer/src/common_subexpr_eliminate.rs      | 368 +++++++++++----------
 datafusion/sqllogictest/test_files/group_by.slt    |   8 +-
 datafusion/sqllogictest/test_files/select.slt      |  16 +-
 datafusion/sqllogictest/test_files/subquery.slt    |   8 +-
 .../sqllogictest/test_files/tpch/q1.slt.part       |   6 +-
 datafusion/sqllogictest/test_files/window.slt      | 126 +++----
 6 files changed, 271 insertions(+), 261 deletions(-)

diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 3ed1309f15..e150a957bf 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -22,19 +22,19 @@ use std::sync::Arc;
 
 use crate::{utils, OptimizerConfig, OptimizerRule};
 
-use arrow::datatypes::{DataType, Field};
+use datafusion_common::alias::AliasGenerator;
 use datafusion_common::tree_node::{
     Transformed, TransformedResult, TreeNode, TreeNodeRecursion, 
TreeNodeRewriter,
     TreeNodeVisitor,
 };
-use datafusion_common::{
-    internal_err, qualified_name, Column, DFSchema, DFSchemaRef, 
DataFusionError, Result,
-};
+use datafusion_common::{qualified_name, Column, DFSchema, DataFusionError, 
Result};
 use datafusion_expr::expr::Alias;
 use datafusion_expr::logical_plan::{Aggregate, LogicalPlan, Projection, 
Window};
 use datafusion_expr::{col, Expr, ExprSchemable};
 use indexmap::IndexMap;
 
+const CSE_PREFIX: &str = "__common_expr";
+
 /// Identifier that represents a subexpression tree.
 ///
 /// Note that the current implementation contains:
@@ -79,16 +79,12 @@ type Identifier = String;
 /// ```
 type IdArray = Vec<(usize, Identifier)>;
 
-/// A map that contains statistics of expressions by their identifiers.
-/// It contains:
-/// - The number of occurrences and
-/// - The DataType
-/// of an expression.
-type ExprStats = HashMap<Identifier, (usize, DataType)>;
+/// A map that contains the number of occurrences of expressions by their 
identifiers.
+type ExprStats = HashMap<Identifier, usize>;
 
-/// A map that contains the common expressions extracted during the second, 
rewriting
-/// traversal.
-type CommonExprs = IndexMap<Identifier, Expr>;
+/// A map that contains the common expressions and their alias extracted 
during the
+/// second, rewriting traversal.
+type CommonExprs = IndexMap<Identifier, (Expr, String)>;
 
 /// Performs Common Sub-expression Elimination optimization.
 ///
@@ -131,6 +127,7 @@ impl CommonSubexprEliminate {
         arrays_list: &[&[IdArray]],
         expr_stats: &ExprStats,
         common_exprs: &mut CommonExprs,
+        alias_generator: &AliasGenerator,
     ) -> Result<Vec<Vec<Expr>>> {
         exprs_list
             .iter()
@@ -141,7 +138,13 @@ impl CommonSubexprEliminate {
                     .cloned()
                     .zip(arrays.iter())
                     .map(|(expr, id_array)| {
-                        replace_common_expr(expr, id_array, expr_stats, 
common_exprs)
+                        replace_common_expr(
+                            expr,
+                            id_array,
+                            expr_stats,
+                            common_exprs,
+                            alias_generator,
+                        )
                     })
                     .collect::<Result<Vec<_>>>()
             })
@@ -164,21 +167,21 @@ impl CommonSubexprEliminate {
         expr_stats: &ExprStats,
         config: &dyn OptimizerConfig,
     ) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
-        let mut common_exprs = IndexMap::new();
-
+        let mut common_exprs = CommonExprs::new();
         let rewrite_exprs = self.rewrite_exprs_list(
             exprs_list,
             arrays_list,
             expr_stats,
             &mut common_exprs,
+            &config.alias_generator(),
         )?;
 
         let mut new_input = self
             .try_optimize(input, config)?
             .unwrap_or_else(|| input.clone());
+
         if !common_exprs.is_empty() {
-            new_input =
-                build_common_expr_project_plan(new_input, common_exprs, 
expr_stats)?;
+            new_input = build_common_expr_project_plan(new_input, 
common_exprs)?;
         }
 
         Ok((rewrite_exprs, new_input))
@@ -209,13 +212,7 @@ impl CommonSubexprEliminate {
             } = window;
             plan = input.as_ref().clone();
 
-            let input_schema = Arc::clone(input.schema());
-            let arrays = to_arrays(
-                &window_expr,
-                input_schema,
-                &mut expr_stats,
-                ExprMask::Normal,
-            )?;
+            let arrays = to_arrays(&window_expr, &mut expr_stats, 
ExprMask::Normal)?;
 
             window_exprs.push(window_expr);
             arrays_per_window.push(arrays);
@@ -277,15 +274,8 @@ impl CommonSubexprEliminate {
         let mut expr_stats = ExprStats::new();
 
         // rewrite inputs
-        let input_schema = Arc::clone(input.schema());
-        let group_arrays = to_arrays(
-            group_expr,
-            Arc::clone(&input_schema),
-            &mut expr_stats,
-            ExprMask::Normal,
-        )?;
-        let aggr_arrays =
-            to_arrays(aggr_expr, input_schema, &mut expr_stats, 
ExprMask::Normal)?;
+        let group_arrays = to_arrays(group_expr, &mut expr_stats, 
ExprMask::Normal)?;
+        let aggr_arrays = to_arrays(aggr_expr, &mut expr_stats, 
ExprMask::Normal)?;
 
         let (mut new_expr, new_input) = self.rewrite_expr(
             &[group_expr, aggr_expr],
@@ -303,16 +293,16 @@ impl CommonSubexprEliminate {
         let new_input_schema = Arc::clone(new_input.schema());
         let aggr_arrays = to_arrays(
             &new_aggr_expr,
-            new_input_schema.clone(),
             &mut expr_stats,
             ExprMask::NormalAndAggregates,
         )?;
-        let mut common_exprs = IndexMap::new();
+        let mut common_exprs = CommonExprs::new();
         let mut rewritten = self.rewrite_exprs_list(
             &[&new_aggr_expr],
             &[&aggr_arrays],
             &expr_stats,
             &mut common_exprs,
+            &config.alias_generator(),
         )?;
         let rewritten = pop_expr(&mut rewritten)?;
 
@@ -330,11 +320,8 @@ impl CommonSubexprEliminate {
                 .map(LogicalPlan::Aggregate)
         } else {
             let mut agg_exprs = common_exprs
-                .into_iter()
-                .map(|(expr_id, expr)| {
-                    // todo: check `nullable`
-                    expr.alias(expr_id)
-                })
+                .into_values()
+                .map(|(expr, expr_alias)| expr.alias(expr_alias))
                 .collect::<Vec<_>>();
 
             let mut proj_exprs = vec![];
@@ -347,14 +334,15 @@ impl CommonSubexprEliminate {
                         agg_exprs.push(expr.alias(&name));
                         proj_exprs.push(Expr::Column(Column::from_name(name)));
                     } else {
-                        let id = expr_identifier(&expr_rewritten, 
"".to_string());
+                        let expr_alias = 
config.alias_generator().next(CSE_PREFIX);
                         let (qualifier, field) =
                             expr_rewritten.to_field(&new_input_schema)?;
                         let out_name = qualified_name(qualifier.as_ref(), 
field.name());
 
-                        agg_exprs.push(expr_rewritten.alias(&id));
-                        proj_exprs
-                            
.push(Expr::Column(Column::from_name(id)).alias(out_name));
+                        agg_exprs.push(expr_rewritten.alias(&expr_alias));
+                        proj_exprs.push(
+                            
Expr::Column(Column::from_name(expr_alias)).alias(out_name),
+                        );
                     }
                 } else {
                     proj_exprs.push(expr_rewritten);
@@ -382,11 +370,10 @@ impl CommonSubexprEliminate {
         let expr = plan.expressions();
         let inputs = plan.inputs();
         let input = inputs[0];
-        let input_schema = Arc::clone(input.schema());
         let mut expr_stats = ExprStats::new();
 
         // Visit expr list and build expr identifier to occuring count map 
(`expr_stats`).
-        let arrays = to_arrays(&expr, input_schema, &mut expr_stats, 
ExprMask::Normal)?;
+        let arrays = to_arrays(&expr, &mut expr_stats, ExprMask::Normal)?;
 
         let (mut new_expr, new_input) =
             self.rewrite_expr(&[&expr], &[&arrays], input, &expr_stats, 
config)?;
@@ -477,20 +464,13 @@ fn pop_expr(new_expr: &mut Vec<Vec<Expr>>) -> 
Result<Vec<Expr>> {
 
 fn to_arrays(
     expr: &[Expr],
-    input_schema: DFSchemaRef,
     expr_stats: &mut ExprStats,
     expr_mask: ExprMask,
 ) -> Result<Vec<IdArray>> {
     expr.iter()
         .map(|e| {
             let mut id_array = vec![];
-            expr_to_identifier(
-                e,
-                expr_stats,
-                &mut id_array,
-                Arc::clone(&input_schema),
-                expr_mask,
-            )?;
+            expr_to_identifier(e, expr_stats, &mut id_array, expr_mask)?;
 
             Ok(id_array)
         })
@@ -510,19 +490,13 @@ fn to_arrays(
 fn build_common_expr_project_plan(
     input: LogicalPlan,
     common_exprs: CommonExprs,
-    expr_stats: &ExprStats,
 ) -> Result<LogicalPlan> {
     let mut fields_set = BTreeSet::new();
     let mut project_exprs = common_exprs
-        .into_iter()
-        .map(|(expr_id, expr)| {
-            let Some((_, data_type)) = expr_stats.get(&expr_id) else {
-                return internal_err!("expr_stats invalid state");
-            };
-            // todo: check `nullable`
-            let field = Field::new(&expr_id, data_type.clone(), true);
-            fields_set.insert(field.name().to_owned());
-            Ok(expr.alias(expr_id))
+        .into_values()
+        .map(|(expr, expr_alias)| {
+            fields_set.insert(expr_alias.clone());
+            Ok(expr.alias(expr_alias))
         })
         .collect::<Result<Vec<_>>>()?;
 
@@ -637,9 +611,6 @@ struct ExprIdentifierVisitor<'a> {
     expr_stats: &'a mut ExprStats,
     // cache to speed up second traversal
     id_array: &'a mut IdArray,
-    // input schema for the node that we're optimizing, so we can determine 
the correct datatype
-    // for each subexpression
-    input_schema: DFSchemaRef,
     // inner states
     visit_stack: Vec<VisitRecord>,
     // preorder index, start from 0.
@@ -714,14 +685,7 @@ impl<'n> TreeNodeVisitor<'n> for ExprIdentifierVisitor<'_> 
{
         self.id_array[down_index].0 = self.up_index;
         if !self.expr_mask.ignores(expr) {
             self.id_array[down_index].1.clone_from(&expr_id);
-
-            // TODO: can we capture the data type in the second traversal only 
for
-            //  replaced expressions?
-            let data_type = expr.get_type(&self.input_schema)?;
-            let (count, _) = self
-                .expr_stats
-                .entry(expr_id.clone())
-                .or_insert((0, data_type));
+            let count = self.expr_stats.entry(expr_id.clone()).or_insert(0);
             *count += 1;
         }
         self.visit_stack.push(VisitRecord::ExprItem(expr_id));
@@ -740,13 +704,11 @@ fn expr_to_identifier(
     expr: &Expr,
     expr_stats: &mut ExprStats,
     id_array: &mut IdArray,
-    input_schema: DFSchemaRef,
     expr_mask: ExprMask,
 ) -> Result<()> {
     expr.visit(&mut ExprIdentifierVisitor {
         expr_stats,
         id_array,
-        input_schema,
         visit_stack: vec![],
         down_index: 0,
         up_index: 0,
@@ -771,6 +733,8 @@ struct CommonSubexprRewriter<'a> {
     down_index: usize,
     // how many aliases have we seen so far
     alias_counter: usize,
+    // alias generator for extracted common expressions
+    alias_generator: &'a AliasGenerator,
 }
 
 impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
@@ -780,17 +744,18 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
         if matches!(expr, Expr::Alias(_)) {
             self.alias_counter -= 1
         }
+
         Ok(Transformed::no(expr))
     }
 
     fn f_down(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
-        // The `CommonSubexprRewriter` relies on `ExprIdentifierVisitor` to 
generate
-        // the `id_array`, which records the expr's identifier used to rewrite 
expr. So if we
-        // skip an expr in `ExprIdentifierVisitor`, we should skip it here, 
too.
         if matches!(expr, Expr::Alias(_)) {
             self.alias_counter += 1;
         }
 
+        // The `CommonSubexprRewriter` relies on `ExprIdentifierVisitor` to 
generate
+        // the `id_array`, which records the expr's identifier used to rewrite 
expr. So if we
+        // skip an expr in `ExprIdentifierVisitor`, we should skip it here, 
too.
         if expr.short_circuits() || expr.is_volatile()? {
             return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump));
         }
@@ -803,8 +768,8 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
             return Ok(Transformed::no(expr));
         }
 
-        let (counter, _) = self.expr_stats.get(expr_id).unwrap();
-        if *counter > 1 {
+        let count = self.expr_stats.get(expr_id).unwrap();
+        if *count > 1 {
             // step index to skip all sub-node (which has smaller series 
number).
             while self.down_index < self.id_array.len()
                 && self.id_array[self.down_index].0 < *up_index
@@ -813,14 +778,18 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> {
             }
 
             let expr_name = expr.display_name()?;
-            self.common_exprs.insert(expr_id.clone(), expr);
+            let (_, expr_alias) =
+                self.common_exprs.entry(expr_id.clone()).or_insert_with(|| {
+                    let expr_alias = self.alias_generator.next(CSE_PREFIX);
+                    (expr, expr_alias)
+                });
 
             // alias the expressions without an `Alias` ancestor node
             let rewritten = if self.alias_counter > 0 {
-                col(expr_id)
+                col(expr_alias.clone())
             } else {
                 self.alias_counter += 1;
-                col(expr_id).alias(expr_name)
+                col(expr_alias.clone()).alias(expr_name)
             };
 
             Ok(Transformed::new(rewritten, true, TreeNodeRecursion::Jump))
@@ -837,6 +806,7 @@ fn replace_common_expr(
     id_array: &IdArray,
     expr_stats: &ExprStats,
     common_exprs: &mut CommonExprs,
+    alias_generator: &AliasGenerator,
 ) -> Result<Expr> {
     expr.rewrite(&mut CommonSubexprRewriter {
         expr_stats,
@@ -844,6 +814,7 @@ fn replace_common_expr(
         common_exprs,
         down_index: 0,
         alias_counter: 0,
+        alias_generator,
     })
     .data()
 }
@@ -852,7 +823,7 @@ fn replace_common_expr(
 mod test {
     use std::iter;
 
-    use arrow::datatypes::Schema;
+    use arrow::datatypes::{DataType, Field, Schema};
 
     use datafusion_expr::logical_plan::{table_scan, JoinType};
 
@@ -868,10 +839,16 @@ mod test {
 
     use super::*;
 
-    fn assert_optimized_plan_eq(expected: &str, plan: &LogicalPlan) {
+    fn assert_optimized_plan_eq(
+        expected: &str,
+        plan: &LogicalPlan,
+        config: Option<&dyn OptimizerConfig>,
+    ) {
         let optimizer = CommonSubexprEliminate {};
+        let default_config = OptimizerContext::new();
+        let config = config.unwrap_or(&default_config);
         let optimized_plan = optimizer
-            .try_optimize(plan, &OptimizerContext::new())
+            .try_optimize(plan, config)
             .unwrap()
             .expect("failed to optimize plan");
         let formatted_plan = format!("{optimized_plan:?}");
@@ -882,24 +859,9 @@ mod test {
     fn id_array_visitor() -> Result<()> {
         let expr = ((sum(col("a") + lit(1))) - avg(col("c"))) * lit(2);
 
-        let schema = Arc::new(DFSchema::from_unqualifed_fields(
-            vec![
-                Field::new("a", DataType::Int64, false),
-                Field::new("c", DataType::Int64, false),
-            ]
-            .into(),
-            Default::default(),
-        )?);
-
         // skip aggregates
         let mut id_array = vec![];
-        expr_to_identifier(
-            &expr,
-            &mut HashMap::new(),
-            &mut id_array,
-            Arc::clone(&schema),
-            ExprMask::Normal,
-        )?;
+        expr_to_identifier(&expr, &mut HashMap::new(), &mut id_array, 
ExprMask::Normal)?;
 
         let expected = vec![
             (8, "{(sum(a + Int32(1)) - AVG(c)) * Int32(2)|{Int32(2)}|{sum(a + 
Int32(1)) - AVG(c)|{AVG(c)|{c}}|{sum(a + Int32(1))|{a + 
Int32(1)|{Int32(1)}|{a}}}}}"),
@@ -923,7 +885,6 @@ mod test {
             &expr,
             &mut HashMap::new(),
             &mut id_array,
-            Arc::clone(&schema),
             ExprMask::NormalAndAggregates,
         )?;
 
@@ -968,11 +929,11 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Aggregate: groupBy=[[]], aggr=[[sum({test.a * 
(Int32(1) - test.b)|{Int32(1) - test.b|{test.b}|{Int32(1)}}|{test.a}} AS test.a 
* Int32(1) - test.b), sum({test.a * (Int32(1) - test.b)|{Int32(1) - 
test.b|{test.b}|{Int32(1)}}|{test.a}} AS test.a * Int32(1) - test.b * (Int32(1) 
+ test.c))]]\
-        \n  Projection: test.a * (Int32(1) - test.b) AS {test.a * (Int32(1) - 
test.b)|{Int32(1) - test.b|{test.b}|{Int32(1)}}|{test.a}}, test.a, test.b, 
test.c\
+        let expected = "Aggregate: groupBy=[[]], aggr=[[sum(__common_expr_1 AS 
test.a * Int32(1) - test.b), sum(__common_expr_1 AS test.a * Int32(1) - test.b 
* (Int32(1) + test.c))]]\
+        \n  Projection: test.a * (Int32(1) - test.b) AS __common_expr_1, 
test.a, test.b, test.c\
         \n    TableScan: test";
 
-        assert_optimized_plan_eq(expected, &plan);
+        assert_optimized_plan_eq(expected, &plan, None);
 
         Ok(())
     }
@@ -988,11 +949,11 @@ mod test {
             ])?
             .build()?;
 
-        let expected = "Projection: {test.a + test.b|{test.b}|{test.a}} - 
test.c AS alias1 * {test.a + test.b|{test.b}|{test.a}} AS test.a + test.b, 
{test.a + test.b|{test.b}|{test.a}} AS test.a + test.b\
-        \n  Projection: test.a + test.b AS {test.a + 
test.b|{test.b}|{test.a}}, test.a, test.b, test.c\
+        let expected = "Projection: __common_expr_1 - test.c AS alias1 * 
__common_expr_1 AS test.a + test.b, __common_expr_1 AS test.a + test.b\
+        \n  Projection: test.a + test.b AS __common_expr_1, test.a, test.b, 
test.c\
         \n    TableScan: test";
 
-        assert_optimized_plan_eq(expected, &plan);
+        assert_optimized_plan_eq(expected, &plan, None);
 
         Ok(())
     }
@@ -1041,11 +1002,11 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Projection: {AVG(test.a)|{test.a}} AS col1, 
{AVG(test.a)|{test.a}} AS col2, col3, {AVG(test.c)} AS AVG(test.c), 
{my_agg(test.a)|{test.a}} AS col4, {my_agg(test.a)|{test.a}} AS col5, col6, 
{my_agg(test.c)} AS my_agg(test.c)\
-        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS 
{AVG(test.a)|{test.a}}, my_agg(test.a) AS {my_agg(test.a)|{test.a}}, 
AVG(test.b) AS col3, AVG(test.c) AS {AVG(test.c)}, my_agg(test.b) AS col6, 
my_agg(test.c) AS {my_agg(test.c)}]]\
+        let expected = "Projection: __common_expr_1 AS col1, __common_expr_1 
AS col2, col3, __common_expr_3 AS AVG(test.c), __common_expr_2 AS col4, 
__common_expr_2 AS col5, col6, __common_expr_4 AS my_agg(test.c)\
+        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS __common_expr_1, 
my_agg(test.a) AS __common_expr_2, AVG(test.b) AS col3, AVG(test.c) AS 
__common_expr_3, my_agg(test.b) AS col6, my_agg(test.c) AS __common_expr_4]]\
         \n    TableScan: test";
 
-        assert_optimized_plan_eq(expected, &plan);
+        assert_optimized_plan_eq(expected, &plan, None);
 
         // test: trafo after aggregate
         let plan = LogicalPlanBuilder::from(table_scan.clone())
@@ -1060,11 +1021,11 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Projection: Int32(1) + {AVG(test.a)|{test.a}} AS 
AVG(test.a), Int32(1) - {AVG(test.a)|{test.a}} AS AVG(test.a), Int32(1) + 
{my_agg(test.a)|{test.a}} AS my_agg(test.a), Int32(1) - 
{my_agg(test.a)|{test.a}} AS my_agg(test.a)\
-        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS 
{AVG(test.a)|{test.a}}, my_agg(test.a) AS {my_agg(test.a)|{test.a}}]]\
+        let expected = "Projection: Int32(1) + __common_expr_1 AS AVG(test.a), 
Int32(1) - __common_expr_1 AS AVG(test.a), Int32(1) + __common_expr_2 AS 
my_agg(test.a), Int32(1) - __common_expr_2 AS my_agg(test.a)\
+        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS __common_expr_1, 
my_agg(test.a) AS __common_expr_2]]\
         \n    TableScan: test";
 
-        assert_optimized_plan_eq(expected, &plan);
+        assert_optimized_plan_eq(expected, &plan, None);
 
         // test: transformation before aggregate
         let plan = LogicalPlanBuilder::from(table_scan.clone())
@@ -1077,9 +1038,11 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Aggregate: groupBy=[[]], aggr=[[AVG({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}) AS col1, my_agg({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}) AS col2]]\n  Projection: UInt32(1) + test.a AS 
{UInt32(1) + test.a|{test.a}|{UInt32(1)}}, test.a, test.b, test.c\n    
TableScan: test";
+        let expected ="Aggregate: groupBy=[[]], aggr=[[AVG(__common_expr_1) AS 
col1, my_agg(__common_expr_1) AS col2]]\
+        \n  Projection: UInt32(1) + test.a AS __common_expr_1, test.a, test.b, 
test.c\
+        \n    TableScan: test";
 
-        assert_optimized_plan_eq(expected, &plan);
+        assert_optimized_plan_eq(expected, &plan, None);
 
         // test: common between agg and group
         let plan = LogicalPlanBuilder::from(table_scan.clone())
@@ -1092,11 +1055,11 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Aggregate: groupBy=[[{UInt32(1) + 
test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a]], aggr=[[AVG({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}) AS col1, my_agg({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}) AS col2]]\
-        \n  Projection: UInt32(1) + test.a AS {UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}, test.a, test.b, test.c\
+        let expected = "Aggregate: groupBy=[[__common_expr_1 AS UInt32(1) + 
test.a]], aggr=[[AVG(__common_expr_1) AS col1, my_agg(__common_expr_1) AS 
col2]]\
+        \n  Projection: UInt32(1) + test.a AS __common_expr_1, test.a, test.b, 
test.c\
         \n    TableScan: test";
 
-        assert_optimized_plan_eq(expected, &plan);
+        assert_optimized_plan_eq(expected, &plan, None);
 
         // test: all mixed
         let plan = LogicalPlanBuilder::from(table_scan)
@@ -1113,18 +1076,18 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Projection: UInt32(1) + test.a, UInt32(1) + 
{AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}})|{{UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}}} AS col1, UInt32(1) - {AVG({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}})|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}} AS 
col2, {AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a)} AS 
AVG(UInt32(1) + test.a), UInt32(1) + {my_agg({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}})|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}} A 
[...]
-        \n  Aggregate: groupBy=[[{UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS 
UInt32(1) + test.a]], aggr=[[AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}}) AS 
{AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}})|{{UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}}}, my_agg({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}) AS {my_agg({UInt32(1) + 
test.a|{test.a}|{UInt32(1)}})|{{UInt32(1) + test.a|{test.a}|{UInt32(1)}}}}, 
AVG({UInt32(1) + test.a|{test.a}|{UInt32(1)}} AS UInt32(1) + test.a) AS 
{AVG({UInt32(1) + t [...]
-        \n    Projection: UInt32(1) + test.a AS {UInt32(1) + 
test.a|{test.a}|{UInt32(1)}}, test.a, test.b, test.c\
+        let expected = "Projection: UInt32(1) + test.a, UInt32(1) + 
__common_expr_2 AS col1, UInt32(1) - __common_expr_2 AS col2, __common_expr_4 
AS AVG(UInt32(1) + test.a), UInt32(1) + __common_expr_3 AS col3, UInt32(1) - 
__common_expr_3 AS col4, __common_expr_5 AS my_agg(UInt32(1) + test.a)\
+        \n  Aggregate: groupBy=[[__common_expr_1 AS UInt32(1) + test.a]], 
aggr=[[AVG(__common_expr_1) AS __common_expr_2, my_agg(__common_expr_1) AS 
__common_expr_3, AVG(__common_expr_1 AS UInt32(1) + test.a) AS __common_expr_4, 
my_agg(__common_expr_1 AS UInt32(1) + test.a) AS __common_expr_5]]\
+        \n    Projection: UInt32(1) + test.a AS __common_expr_1, test.a, 
test.b, test.c\
         \n      TableScan: test";
 
-        assert_optimized_plan_eq(expected, &plan);
+        assert_optimized_plan_eq(expected, &plan, None);
 
         Ok(())
     }
 
     #[test]
-    fn aggregate_with_releations_and_dots() -> Result<()> {
+    fn aggregate_with_relations_and_dots() -> Result<()> {
         let schema = Schema::new(vec![Field::new("col.a", DataType::UInt32, 
false)]);
         let table_scan = table_scan(Some("table.test"), &schema, 
None)?.build()?;
 
@@ -1140,12 +1103,12 @@ mod test {
             )?
             .build()?;
 
-        let expected = "Projection: table.test.col.a, UInt32(1) + 
{AVG({UInt32(1) + table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) 
+ table.test.col.a)|{{UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a|{{UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}}}}} AS AVG(UInt32(1) + 
table.test.col.a), {AVG({UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a)|{{UInt32(1) + table.test.co [...]
-        \n  Aggregate: groupBy=[[table.test.col.a]], aggr=[[AVG({UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a) AS {AVG({UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a)|{{UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}} AS UInt32(1) + 
table.test.col.a|{{UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}}}}}]]\
-        \n    Projection: UInt32(1) + table.test.col.a AS {UInt32(1) + 
table.test.col.a|{table.test.col.a}|{UInt32(1)}}, table.test.col.a\
+        let expected = "Projection: table.test.col.a, UInt32(1) + 
__common_expr_2 AS AVG(UInt32(1) + table.test.col.a), __common_expr_2 AS 
AVG(UInt32(1) + table.test.col.a)\
+        \n  Aggregate: groupBy=[[table.test.col.a]], 
aggr=[[AVG(__common_expr_1 AS UInt32(1) + table.test.col.a) AS 
__common_expr_2]]\
+        \n    Projection: UInt32(1) + table.test.col.a AS __common_expr_1, 
table.test.col.a\
         \n      TableScan: table.test";
 
-        assert_optimized_plan_eq(expected, &plan);
+        assert_optimized_plan_eq(expected, &plan, None);
 
         Ok(())
     }
@@ -1161,11 +1124,11 @@ mod test {
             ])?
             .build()?;
 
-        let expected = "Projection: {Int32(1) + test.a|{test.a}|{Int32(1)}} AS 
first, {Int32(1) + test.a|{test.a}|{Int32(1)}} AS second\
-        \n  Projection: Int32(1) + test.a AS {Int32(1) + 
test.a|{test.a}|{Int32(1)}}, test.a, test.b, test.c\
+        let expected = "Projection: __common_expr_1 AS first, __common_expr_1 
AS second\
+        \n  Projection: Int32(1) + test.a AS __common_expr_1, test.a, test.b, 
test.c\
         \n    TableScan: test";
 
-        assert_optimized_plan_eq(expected, &plan);
+        assert_optimized_plan_eq(expected, &plan, None);
 
         Ok(())
     }
@@ -1181,7 +1144,7 @@ mod test {
         let expected = "Projection: Int32(1) + test.a, test.a + Int32(1)\
         \n  TableScan: test";
 
-        assert_optimized_plan_eq(expected, &plan);
+        assert_optimized_plan_eq(expected, &plan, None);
 
         Ok(())
     }
@@ -1199,35 +1162,35 @@ mod test {
         \n  Projection: Int32(1) + test.a, test.a\
         \n    TableScan: test";
 
-        assert_optimized_plan_eq(expected, &plan);
+        assert_optimized_plan_eq(expected, &plan, None);
         Ok(())
     }
 
     #[test]
     fn redundant_project_fields() {
         let table_scan = test_table_scan().unwrap();
-        let expr_stats_1 = ExprStats::from([
-            ("c+a".to_string(), (1, DataType::UInt32)),
-            ("b+a".to_string(), (1, DataType::UInt32)),
-        ]);
-        let common_exprs_1 = IndexMap::from([
-            ("c+a".to_string(), col("c") + col("a")),
-            ("b+a".to_string(), col("b") + col("a")),
-        ]);
-        let exprs_stats_2 = ExprStats::from([
-            ("c+a".to_string(), (1, DataType::UInt32)),
-            ("b+a".to_string(), (1, DataType::UInt32)),
+        let common_exprs_1 = CommonExprs::from([
+            (
+                "c+a".to_string(),
+                (col("c") + col("a"), format!("{CSE_PREFIX}_1")),
+            ),
+            (
+                "b+a".to_string(),
+                (col("b") + col("a"), format!("{CSE_PREFIX}_2")),
+            ),
         ]);
-        let common_exprs_2 = IndexMap::from([
-            ("c+a".to_string(), col("c+a")),
-            ("b+a".to_string(), col("b+a")),
+        let common_exprs_2 = CommonExprs::from([
+            (
+                "c+a".to_string(),
+                (col(format!("{CSE_PREFIX}_1")), format!("{CSE_PREFIX}_3")),
+            ),
+            (
+                "b+a".to_string(),
+                (col(format!("{CSE_PREFIX}_2")), format!("{CSE_PREFIX}_4")),
+            ),
         ]);
-        let project =
-            build_common_expr_project_plan(table_scan, common_exprs_1, 
&expr_stats_1)
-                .unwrap();
-        let project_2 =
-            build_common_expr_project_plan(project, common_exprs_2, 
&exprs_stats_2)
-                .unwrap();
+        let project = build_common_expr_project_plan(table_scan, 
common_exprs_1).unwrap();
+        let project_2 = build_common_expr_project_plan(project, 
common_exprs_2).unwrap();
 
         let mut field_set = BTreeSet::new();
         for name in project_2.schema().field_names() {
@@ -1244,33 +1207,28 @@ mod test {
             .unwrap()
             .build()
             .unwrap();
-        let expr_stats_1 = ExprStats::from([
-            ("test1.c+test1.a".to_string(), (1, DataType::UInt32)),
-            ("test1.b+test1.a".to_string(), (1, DataType::UInt32)),
-        ]);
-        let common_exprs_1 = IndexMap::from([
+        let common_exprs_1 = CommonExprs::from([
             (
                 "test1.c+test1.a".to_string(),
-                col("test1.c") + col("test1.a"),
+                (col("test1.c") + col("test1.a"), format!("{CSE_PREFIX}_1")),
             ),
             (
                 "test1.b+test1.a".to_string(),
-                col("test1.b") + col("test1.a"),
+                (col("test1.b") + col("test1.a"), format!("{CSE_PREFIX}_2")),
             ),
         ]);
-        let expr_stats_2 = ExprStats::from([
-            ("test1.c+test1.a".to_string(), (1, DataType::UInt32)),
-            ("test1.b+test1.a".to_string(), (1, DataType::UInt32)),
-        ]);
-        let common_exprs_2 = IndexMap::from([
-            ("test1.c+test1.a".to_string(), col("test1.c+test1.a")),
-            ("test1.b+test1.a".to_string(), col("test1.b+test1.a")),
+        let common_exprs_2 = CommonExprs::from([
+            (
+                "test1.c+test1.a".to_string(),
+                (col(format!("{CSE_PREFIX}_1")), format!("{CSE_PREFIX}_3")),
+            ),
+            (
+                "test1.b+test1.a".to_string(),
+                (col(format!("{CSE_PREFIX}_2")), format!("{CSE_PREFIX}_4")),
+            ),
         ]);
-        let project =
-            build_common_expr_project_plan(join, common_exprs_1, 
&expr_stats_1).unwrap();
-        let project_2 =
-            build_common_expr_project_plan(project, common_exprs_2, 
&expr_stats_2)
-                .unwrap();
+        let project = build_common_expr_project_plan(join, 
common_exprs_1).unwrap();
+        let project_2 = build_common_expr_project_plan(project, 
common_exprs_2).unwrap();
 
         let mut field_set = BTreeSet::new();
         for name in project_2.schema().field_names() {
@@ -1337,11 +1295,11 @@ mod test {
             .build()?;
 
         let expected = "Projection: test.a, test.b, test.c\
-        \n  Filter: {Int32(1) + test.a|{test.a}|{Int32(1)}} - Int32(10) > 
{Int32(1) + test.a|{test.a}|{Int32(1)}}\
-        \n    Projection: Int32(1) + test.a AS {Int32(1) + 
test.a|{test.a}|{Int32(1)}}, test.a, test.b, test.c\
+        \n  Filter: __common_expr_1 - Int32(10) > __common_expr_1\
+        \n    Projection: Int32(1) + test.a AS __common_expr_1, test.a, 
test.b, test.c\
         \n      TableScan: test";
 
-        assert_optimized_plan_eq(expected, &plan);
+        assert_optimized_plan_eq(expected, &plan, None);
 
         Ok(())
     }
@@ -1383,6 +1341,58 @@ mod test {
         Ok(())
     }
 
+    #[test]
+    fn test_alias_collision() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let config = &OptimizerContext::new();
+        let common_expr_1 = config.alias_generator().next(CSE_PREFIX);
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .project(vec![
+                (col("a") + col("b")).alias(common_expr_1.clone()),
+                col("c"),
+            ])?
+            .project(vec![
+                col(common_expr_1.clone()).alias("c1"),
+                col(common_expr_1).alias("c2"),
+                (col("c") + lit(2)).alias("c3"),
+                (col("c") + lit(2)).alias("c4"),
+            ])?
+            .build()?;
+
+        let expected = "Projection: __common_expr_1 AS c1, __common_expr_1 AS 
c2, __common_expr_2 AS c3, __common_expr_2 AS c4\
+        \n  Projection: test.c + Int32(2) AS __common_expr_2, __common_expr_1, 
test.c\
+        \n    Projection: test.a + test.b AS __common_expr_1, test.c\
+        \n      TableScan: test";
+
+        assert_optimized_plan_eq(expected, &plan, Some(config));
+
+        let config = &OptimizerContext::new();
+        let _common_expr_1 = config.alias_generator().next(CSE_PREFIX);
+        let common_expr_2 = config.alias_generator().next(CSE_PREFIX);
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .project(vec![
+                (col("a") + col("b")).alias(common_expr_2.clone()),
+                col("c"),
+            ])?
+            .project(vec![
+                col(common_expr_2.clone()).alias("c1"),
+                col(common_expr_2).alias("c2"),
+                (col("c") + lit(2)).alias("c3"),
+                (col("c") + lit(2)).alias("c4"),
+            ])?
+            .build()?;
+
+        let expected = "Projection: __common_expr_2 AS c1, __common_expr_2 AS 
c2, __common_expr_3 AS c3, __common_expr_3 AS c4\
+        \n  Projection: test.c + Int32(2) AS __common_expr_3, __common_expr_2, 
test.c\
+        \n    Projection: test.a + test.b AS __common_expr_2, test.c\
+        \n      TableScan: test";
+
+        assert_optimized_plan_eq(expected, &plan, Some(config));
+
+        Ok(())
+    }
+
     #[test]
     fn test_extract_expressions_from_col() -> Result<()> {
         let mut result = Vec::with_capacity(1);
diff --git a/datafusion/sqllogictest/test_files/group_by.slt 
b/datafusion/sqllogictest/test_files/group_by.slt
index 9e8a2450e0..8ccf3ae853 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -4187,8 +4187,8 @@ EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), 
MAX(DISTINCT CAST(x AS DOUBLE))
 logical_plan
 01)Projection: sum(alias1) AS sum(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT 
t1.x)
 02)--Aggregate: groupBy=[[t1.y]], aggr=[[sum(alias1), MAX(alias1)]]
-03)----Aggregate: groupBy=[[t1.y, {CAST(t1.x AS Float64)|{t1.x}} AS t1.x AS 
alias1]], aggr=[[]]
-04)------Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS Float64)|{t1.x}}, 
t1.y
+03)----Aggregate: groupBy=[[t1.y, __common_expr_1 AS t1.x AS alias1]], 
aggr=[[]]
+04)------Projection: CAST(t1.x AS Float64) AS __common_expr_1, t1.y
 05)--------TableScan: t1 projection=[x, y]
 physical_plan
 01)ProjectionExec: expr=[sum(alias1)@1 as sum(DISTINCT t1.x), MAX(alias1)@2 as 
MAX(DISTINCT t1.x)]
@@ -4200,8 +4200,8 @@ physical_plan
 07)------------CoalesceBatchesExec: target_batch_size=2
 08)--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), 
input_partitions=8
 09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
-10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, {CAST(t1.x AS 
Float64)|{t1.x}}@0 as alias1], aggr=[]
-11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as 
{CAST(t1.x AS Float64)|{t1.x}}, y@1 as y]
+10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, 
__common_expr_1@0 as alias1], aggr=[]
+11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as 
__common_expr_1, y@1 as y]
 12)----------------------MemoryExec: partitions=1, partition_sizes=[1]
 
 # create an unbounded table that contains ordered timestamp.
diff --git a/datafusion/sqllogictest/test_files/select.slt 
b/datafusion/sqllogictest/test_files/select.slt
index aae35c1ce7..c8ef2b7f5e 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1436,12 +1436,12 @@ query TT
 EXPLAIN SELECT x/2, x/2+1 FROM t;
 ----
 logical_plan
-01)Projection: {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2), {t.x / 
Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2) + Int64(1)
-02)--Projection: t.x / Int64(2) AS {t.x / Int64(2)|{Int64(2)}|{t.x}}
+01)Projection: __common_expr_1 AS t.x / Int64(2), __common_expr_1 AS t.x / 
Int64(2) + Int64(1)
+02)--Projection: t.x / Int64(2) AS __common_expr_1
 03)----TableScan: t projection=[x]
 physical_plan
-01)ProjectionExec: expr=[{t.x / Int64(2)|{Int64(2)}|{t.x}}@0 as t.x / 
Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}}@0 + 1 as t.x / Int64(2) + Int64(1)]
-02)--ProjectionExec: expr=[x@0 / 2 as {t.x / Int64(2)|{Int64(2)}|{t.x}}]
+01)ProjectionExec: expr=[__common_expr_1@0 as t.x / Int64(2), 
__common_expr_1@0 + 1 as t.x / Int64(2) + Int64(1)]
+02)--ProjectionExec: expr=[x@0 / 2 as __common_expr_1]
 03)----MemoryExec: partitions=1, partition_sizes=[1]
 
 query II
@@ -1454,12 +1454,12 @@ query TT
 EXPLAIN SELECT abs(x), abs(x) + abs(y) FROM t;
 ----
 logical_plan
-01)Projection: {abs(t.x)|{t.x}} AS abs(t.x), {abs(t.x)|{t.x}} AS abs(t.x) + 
abs(t.y)
-02)--Projection: abs(t.x) AS {abs(t.x)|{t.x}}, t.y
+01)Projection: __common_expr_1 AS abs(t.x), __common_expr_1 AS abs(t.x) + 
abs(t.y)
+02)--Projection: abs(t.x) AS __common_expr_1, t.y
 03)----TableScan: t projection=[x, y]
 physical_plan
-01)ProjectionExec: expr=[{abs(t.x)|{t.x}}@0 as abs(t.x), {abs(t.x)|{t.x}}@0 + 
abs(y@1) as abs(t.x) + abs(t.y)]
-02)--ProjectionExec: expr=[abs(x@0) as {abs(t.x)|{t.x}}, y@1 as y]
+01)ProjectionExec: expr=[__common_expr_1@0 as abs(t.x), __common_expr_1@0 + 
abs(y@1) as abs(t.x) + abs(t.y)]
+02)--ProjectionExec: expr=[abs(x@0) as __common_expr_1, y@1 as y]
 03)----MemoryExec: partitions=1, partition_sizes=[1]
 
 query II
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index eb0904b230..f325d55676 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -1082,8 +1082,8 @@ query TT
 explain select a/2, a/2 + 1 from t
 ----
 logical_plan
-01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a / 
Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
-02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
+01)Projection: __common_expr_1 AS t.a / Int64(2), __common_expr_1 AS t.a / 
Int64(2) + Int64(1)
+02)--Projection: t.a / Int64(2) AS __common_expr_1
 03)----TableScan: t projection=[a]
 
 statement ok
@@ -1093,8 +1093,8 @@ query TT
 explain select a/2, a/2 + 1 from t
 ----
 logical_plan
-01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a / 
Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
-02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
+01)Projection: __common_expr_1 AS t.a / Int64(2), __common_expr_1 AS t.a / 
Int64(2) + Int64(1)
+02)--Projection: t.a / Int64(2) AS __common_expr_1
 03)----TableScan: t projection=[a]
 
 ###
diff --git a/datafusion/sqllogictest/test_files/tpch/q1.slt.part 
b/datafusion/sqllogictest/test_files/tpch/q1.slt.part
index 5e0930b992..5a21bdf276 100644
--- a/datafusion/sqllogictest/test_files/tpch/q1.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q1.slt.part
@@ -42,8 +42,8 @@ explain select
 logical_plan
 01)Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS 
LAST
 02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus, 
sum(lineitem.l_quantity) AS sum_qty, sum(lineitem.l_extendedprice) AS 
sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) 
AS sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, 
AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS 
avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(*) AS count_order
-03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], 
aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), 
sum({lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)|{Decimal128(Some(1),20,0) - 
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}})
 AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), 
sum({lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discou 
[...]
-04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount) AS {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)|{Decimal128(Some(1),20,0) - 
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}},
 lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, 
lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
+03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], 
aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), 
sum(__common_expr_1) AS sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), sum(__common_expr_1 * (Decimal128(Some(1),20,0) + 
lineitem.l_tax)) AS sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), 
AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(Int64(1)) AS  
[...]
+04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount) AS __common_expr_1, lineitem.l_quantity, 
lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, 
lineitem.l_returnflag, lineitem.l_linestatus
 05)--------Filter: lineitem.l_shipdate <= Date32("1998-09-02")
 06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice, 
l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], 
partial_filters=[lineitem.l_shipdate <= Date32("1998-09-02")]
 physical_plan
@@ -54,7 +54,7 @@ physical_plan
 05)--------CoalesceBatchesExec: target_batch_size=8192
 06)----------RepartitionExec: partitioning=Hash([l_returnflag@0, 
l_linestatus@1], 4), input_partitions=4
 07)------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as 
l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), 
sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), 
AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(*)]
-08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - 
l_discount@2) as {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - 
lineitem.l_discount)|{Decimal128(Some(1),20,0) - 
lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}},
 l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 
as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 
as l_linestatus]
+08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - 
l_discount@2) as __common_expr_1, l_quantity@0 as l_quantity, l_extendedprice@1 
as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, 
l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
 09)----------------CoalesceBatchesExec: target_batch_size=8192
 10)------------------FilterExec: l_shipdate@6 <= 1998-09-02
 11)--------------------CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_quantity, l_extendedprice, l_discount, l_tax, l [...]
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 99f92b65c3..77b839f3f7 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1718,20 +1718,20 @@ EXPLAIN SELECT c3,
 logical_plan
 01)Projection: aggregate_test_100.c3, sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, 
sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
 02)--Limit: skip=0, fetch=5
-03)----WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY 
[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS 
aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING  [...]
-04)------Projection: {aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, 
aggregate_test_100.c3, aggregate_test_100.c9, sum(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-05)--------WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY 
[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}} AS 
aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DE [...]
-06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS 
{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, 
aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
+03)----WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY 
[__common_expr_1 AS aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS 
LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+04)------Projection: __common_expr_1, aggregate_test_100.c3, 
aggregate_test_100.c9, sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+05)--------WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY 
[__common_expr_1 AS aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS 
FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS 
sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PR [...]
+06)----------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS 
__common_expr_1, aggregate_test_100.c2, aggregate_test_100.c3, 
aggregate_test_100.c9
 07)------------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]
 physical_plan
 01)ProjectionExec: expr=[c3@1 as c3, sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, 
sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
 02)--GlobalLimitExec: skip=0, fetch=5
 03)----WindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: "sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, [...]
-04)------ProjectionExec: expr=[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 as 
{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c3@2 as 
c3, c9@3 as c9, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@4 as sum(a [...]
+04)------ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1, c3@2 as 
c3, c9@3 as c9, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@4 as sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 AS [...]
 05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRE [...]
-06)----------SortPreservingMergeExec: [{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 
DESC,c9@3 DESC,c2@1 ASC NULLS LAST]
-07)------------SortExec: expr=[{aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}@0 
DESC,c9@3 DESC,c2@1 ASC NULLS LAST], preserve_partitioning=[true]
-08)--------------ProjectionExec: expr=[c3@1 + c4@2 as {aggregate_test_100.c3 + 
aggregate_test_100.c4|{aggregate_test_100.c4}|{aggregate_test_100.c3}}, c2@0 as 
c2, c3@1 as c3, c9@3 as c9]
+06)----------SortPreservingMergeExec: [__common_expr_1@0 DESC,c9@3 DESC,c2@1 
ASC NULLS LAST]
+07)------------SortExec: expr=[__common_expr_1@0 DESC,c9@3 DESC,c2@1 ASC NULLS 
LAST], preserve_partitioning=[true]
+08)--------------ProjectionExec: expr=[c3@1 + c4@2 as __common_expr_1, c2@0 as 
c2, c3@1 as c3, c9@3 as c9]
 09)----------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 10)------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3, c4, c9], has_header=true
 
@@ -2574,11 +2574,11 @@ logical_plan
 02)--Limit: skip=0, fetch=5
 03)----Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
 04)------Projection: sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING AS sum1, sum(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING AS sum2, sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_dat 
[...]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite.desc_col 
AS Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col) 
ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) ROWS BETWEEN 8 
PRECEDING AND 1 FOLLOWING AS COUNT(*) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
-06)----------Projection: {CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}, annotated_data_finite.inc_col, 
sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, 
sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RO [...]
-07)------------WindowAggr: 
windowExpr=[[sum({CAST(annotated_data_finite.inc_col AS 
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING, sum({CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING 
AND 1 FOLLOWING, sum({CAST(annotated_data_f [...]
-08)--------------WindowAggr: 
windowExpr=[[sum({CAST(annotated_data_finite.inc_col AS 
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, sum({CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}} AS annotated_data_finite.desc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 8 FOLLOWING, sum({CAST(annotated_d [...]
-09)----------------Projection: CAST(annotated_data_finite.desc_col AS Int64) 
AS {CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}, CAST(annotated_data_finite.inc_col AS 
Int64) AS {CAST(annotated_data_finite.inc_col AS 
Int64)|{annotated_data_finite.inc_col}}, annotated_data_finite.ts, 
annotated_data_finite.inc_col, annotated_data_finite.desc_col
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, 
COUNT(Int64(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING AS COUNT(*) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+06)----------Projection: __common_expr_1, annotated_data_finite.inc_col, 
sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, 
sum(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, 
MIN(annotated_data_finite [...]
+07)------------WindowAggr: windowExpr=[[sum(__common_expr_2 AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, sum(__common_expr_2 AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MIN(an [...]
+08)--------------WindowAggr: windowExpr=[[sum(__common_expr_2 AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS 
FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS 
FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, [...]
+09)----------------Projection: CAST(annotated_data_finite.desc_col AS Int64) 
AS __common_expr_1, CAST(annotated_data_finite.inc_col AS Int64) AS 
__common_expr_2, annotated_data_finite.ts, annotated_data_finite.inc_col, 
annotated_data_finite.desc_col
 10)------------------TableScan: annotated_data_finite projection=[ts, inc_col, 
desc_col]
 physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, 
min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, 
max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as 
sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as 
minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as 
cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
@@ -2586,10 +2586,10 @@ physical_plan
 03)----SortExec: TopK(fetch=5), expr=[inc_col@24 DESC], 
preserve_partitioning=[false]
 04)------ProjectionExec: expr=[sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@13 as sum1, sum(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING@14 as sum2, sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDE [...]
 05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), 
end_bound: Following(UInt64(1)), is_causal: false }, COUNT(*) ROWS BETWEEN 8 
PRECEDING AND 1 FOLLOWING: Ok(Field { name: "C [...]
-06)----------ProjectionExec: expr=[{CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}@0 as 
{CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}, inc_col@3 as inc_col, 
sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@5 as 
sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, s [...]
+06)----------ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1, 
inc_col@3 as inc_col, sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING@5 as sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, sum(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING@ [...]
 07)------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(10)), end_bound: Fol [...]
 08)--------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(4)), end_bound:  [...]
-09)----------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as 
{CAST(annotated_data_finite.desc_col AS 
Int64)|{annotated_data_finite.desc_col}}, CAST(inc_col@1 AS Int64) as 
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}}, 
ts@0 as ts, inc_col@1 as inc_col, desc_col@2 as desc_col]
+09)----------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as 
__common_expr_1, CAST(inc_col@1 AS Int64) as __common_expr_2, ts@0 as ts, 
inc_col@1 as inc_col, desc_col@2 as desc_col]
 10)------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIII
@@ -2738,9 +2738,9 @@ logical_plan
 02)--Limit: skip=0, fetch=5
 03)----Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
 04)------Projection: sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING AS sum1, sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) [...]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite.inc_col AS 
Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN  [...]
-06)----------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite.inc_col 
AS Int64)|{annotated_data_finite.inc_col}} AS annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE  [...]
-07)------------Projection: CAST(annotated_data_finite.inc_col AS Int64) AS 
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}}, 
CAST(annotated_data_finite.inc_col AS Float64) AS 
{CAST(annotated_data_finite.inc_col AS 
Float64)|{annotated_data_finite.inc_col}}, annotated_data_finite.ts, 
annotated_data_finite.inc_col
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, 
MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, 
MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, 
COUNT(annotated_data_finit [...]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS 
FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, 
MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, 
MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, 
COUNT(annotated_da [...]
+07)------------Projection: CAST(annotated_data_finite.inc_col AS Int64) AS 
__common_expr_1, CAST(annotated_data_finite.inc_col AS Float64) AS 
__common_expr_2, annotated_data_finite.ts, annotated_data_finite.inc_col
 08)--------------TableScan: annotated_data_finite projection=[ts, inc_col]
 physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, 
min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as 
count2, avg1@8 as avg1, avg2@9 as avg2]
@@ -2749,7 +2749,7 @@ physical_plan
 04)------ProjectionExec: expr=[sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@9 as sum1, sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@4 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@10 as min1, MIN(annotated_dat [...]
 05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND 5 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(NULL)), en [...]
 06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(NULL [...]
-07)------------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
{CAST(annotated_data_finite.inc_col AS Int64)|{annotated_data_finite.inc_col}}, 
CAST(inc_col@1 AS Float64) as {CAST(annotated_data_finite.inc_col AS 
Float64)|{annotated_data_finite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
+07)------------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
__common_expr_1, CAST(inc_col@1 AS Float64) as __common_expr_2, ts@0 as ts, 
inc_col@1 as inc_col]
 08)--------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIRR
@@ -2839,9 +2839,9 @@ logical_plan
 02)--Limit: skip=0, fetch=5
 03)----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
 04)------Projection: sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS sum1, sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS count1, COUNT(annotated_data_ [...]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite.inc_col 
AS Int64)|{annotated_data_infinite.inc_col}} AS 
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS 
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, 
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC 
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
-06)----------WindowAggr: 
windowExpr=[[sum({CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}} AS annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
-07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
{CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}}, annotated_data_infinite.ts, 
annotated_data_infinite.inc_col
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS 
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, 
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC 
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC 
NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, 
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts 
DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
__common_expr_1, annotated_data_infinite.ts, annotated_data_infinite.inc_col
 08)--------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
@@ -2849,7 +2849,7 @@ physical_plan
 03)----ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, COUNT(a [...]
 04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)) [...]
 05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
{CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
+06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
__common_expr_1, ts@0 as ts, inc_col@1 as inc_col]
 07)------------StreamingTableExec: partition_sizes=1, projection=[ts, 
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
 query IIII
@@ -2886,9 +2886,9 @@ logical_plan
 02)--Limit: skip=0, fetch=5
 03)----Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
 04)------Projection: sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS sum1, sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING AS count1, COUNT(annotated_data_ [...]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite.inc_col 
AS Int64)|{annotated_data_infinite.inc_col}} AS 
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS 
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, 
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC 
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
-06)----------WindowAggr: 
windowExpr=[[sum({CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}} AS annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING]]
-07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
{CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}}, annotated_data_infinite.ts, 
annotated_data_infinite.inc_col
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS 
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, 
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC 
NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC 
NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, 
COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts 
DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+07)------------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
__common_expr_1, annotated_data_infinite.ts, annotated_data_infinite.inc_col
 08)--------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
@@ -2896,7 +2896,7 @@ physical_plan
 03)----ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, COUNT(a [...]
 04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)) [...]
 05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
{CAST(annotated_data_infinite.inc_col AS 
Int64)|{annotated_data_infinite.inc_col}}, ts@0 as ts, inc_col@1 as inc_col]
+06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
__common_expr_1, ts@0 as ts, inc_col@1 as inc_col]
 07)------------StreamingTableExec: partition_sizes=1, projection=[ts, 
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
 
@@ -2983,13 +2983,13 @@ EXPLAIN SELECT a, b, c,
 logical_plan
 01)Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, sum(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, 
sum(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NU [...]
 02)--Limit: skip=0, fetch=5
-03)----WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
sum({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS 
annotated_data_infinite2.c) PARTITION BY [annotat [...]
-04)------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
sum({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotat 
[...]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, sum({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER [...]
-06)----------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
sum({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_i [...]
-07)------------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.c 
AS Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
sum({CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}} AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, a [...]
-08)--------------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.c 
AS Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, sum({CAST(annotated_data_infinite2.c AS 
Int64)|{annotated_data_infinite2.c}} AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] [...]
-09)----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
{CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}}, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
+03)----WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC 
NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING 
AND 1 FOLLOWING, sum(__common_expr_1 AS annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST, annotated_data_infinite2.b ASC NULLS LAS [...]
+04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, sum(__common_expr_1 AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN [...]
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, sum(__common_expr_1 AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infini [...]
+07)------------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, sum(__common_expr_1 AS annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS B [...]
+08)--------------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+09)----------------Projection: CAST(annotated_data_infinite2.c AS Int64) AS 
__common_expr_1, annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
 10)------------------TableScan: annotated_data_infinite2 projection=[a, b, c, 
d]
 physical_plan
 01)ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, 
sum(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@9 as sum1, sum(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC N 
[...]
@@ -3000,7 +3000,7 @@ physical_plan
 06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LA [...]
 07)------------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING [...]
 08)--------------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, d [...]
-09)----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
{CAST(annotated_data_infinite2.c AS Int64)|{annotated_data_infinite2.c}}, a@0 
as a, b@1 as b, c@2 as c, d@3 as d]
+09)----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
__common_expr_1, a@0 as a, b@1 as b, c@2 as c, d@3 as d]
 10)------------------StreamingTableExec: partition_sizes=1, projection=[a, b, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST]
 
 query IIIIIIIIIIIIIII
@@ -3052,13 +3052,13 @@ logical_plan
 01)Limit: skip=0, fetch=5
 02)--Sort: annotated_data_finite2.c ASC NULLS LAST, fetch=5
 03)----Projection: annotated_data_finite2.a, annotated_data_finite2.b, 
annotated_data_finite2.c, sum(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, 
sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_ [...]
-04)------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, 
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
sum({CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}} AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] [...]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, sum({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, a [...]
-06)----------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, sum({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_ 
[...]
-07)------------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, 
sum({CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}} AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotate [...]
-08)--------------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite2.c 
AS Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOLLOWING, sum({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite [...]
-09)----------------WindowAggr: windowExpr=[[sum({CAST(annotated_data_finite2.c 
AS Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, sum({CAST(annotated_data_finite2.c AS 
Int64)|{annotated_data_finite2.c}} AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated [...]
-10)------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
{CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}}, 
annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, 
annotated_data_finite2.d
+04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS 
LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, sum(__common_expr_1 AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, 
annotated_data_finite2.b ASC NULLS LAST, annotated_dat [...]
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, sum(__common_expr_1 AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 
1 [...]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+07)------------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, sum(__common_expr_1 AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LA 
[...]
+08)--------------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING, sum(__common_expr_1 AS annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING 
[...]
+09)----------------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(__common_expr_1 AS 
annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+10)------------------Projection: CAST(annotated_data_finite2.c AS Int64) AS 
__common_expr_1, annotated_data_finite2.a, annotated_data_finite2.b, 
annotated_data_finite2.c, annotated_data_finite2.d
 11)--------------------TableScan: annotated_data_finite2 projection=[a, b, c, 
d]
 physical_plan
 01)GlobalLimitExec: skip=0, fetch=5
@@ -3075,7 +3075,7 @@ physical_plan
 12)----------------------BoundedWindowAggExec: 
wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "sum(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] 
ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 
1 FOL [...]
 13)------------------------SortExec: expr=[a@1 ASC NULLS LAST,b@2 ASC NULLS 
LAST,d@4 ASC NULLS LAST,c@3 ASC NULLS LAST], preserve_partitioning=[false]
 14)--------------------------BoundedWindowAggExec: 
wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, 
dict_ [...]
-15)----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
{CAST(annotated_data_finite2.c AS Int64)|{annotated_data_finite2.c}}, a@0 as a, 
b@1 as b, c@2 as c, d@3 as d]
+15)----------------------------ProjectionExec: expr=[CAST(c@2 AS Int64) as 
__common_expr_1, a@0 as a, b@1 as b, c@2 as c, d@3 as d]
 16)------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS 
LAST], has_header=true
 
 query IIIIIIIIIIIIIII
@@ -3241,21 +3241,21 @@ FROM annotated_data_infinite2;
 ----
 logical_plan
 01)Projection: sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum1, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum2, sum(annotated_data_infinite2.a) PARTITION BY [ann [...]
-02)--WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-03)----Projection: {CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}, annotated_data_infinite2.a, 
annotated_data_infinite2.d, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2. [...]
-04)------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-06)----------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
+02)--WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+03)----Projection: __common_expr_1, annotated_data_infinite2.a, 
annotated_data_infinite2.d, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND C [...]
+04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
__common_expr_1, annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
 08)--------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
 physical_plan
 01)ProjectionExec: expr=[sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@3 as sum1, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@5 as sum2, sum(annotated_data_infinite2.a) PAR [...]
 02)--BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }) [...]
-03)----ProjectionExec: expr=[{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 as {CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}, a@1 as a, d@4 as d, 
sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotat [...]
+03)----ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1, a@1 as a, 
d@4 as d, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@5 as sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CU [...]
 04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullabl [...]
 05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nulla [...]
 06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nul [...]
-07)------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, a@0 
as a, b@1 as b, c@2 as c, d@3 as d]
+07)------------ProjectionExec: expr=[CAST(a@0 AS Int64) as __common_expr_1, 
a@0 as a, b@1 as b, c@2 as c, d@3 as d]
 08)--------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, 
d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST]
 
 statement ok
@@ -3272,29 +3272,29 @@ FROM annotated_data_infinite2;
 ----
 logical_plan
 01)Projection: sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum1, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum2, sum(annotated_data_infinite2.a) PARTITION BY [ann [...]
-02)--WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
-03)----Projection: {CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}, annotated_data_infinite2.a, 
annotated_data_infinite2.d, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2. [...]
-04)------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-05)--------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-06)----------WindowAggr: windowExpr=[[sum({CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}} AS annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
-07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, 
annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
+02)--WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+03)----Projection: __common_expr_1, annotated_data_infinite2.a, 
annotated_data_infinite2.d, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND C [...]
+04)------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS 
annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+07)------------Projection: CAST(annotated_data_infinite2.a AS Int64) AS 
__common_expr_1, annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.c, annotated_data_infinite2.d
 08)--------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
 physical_plan
 01)ProjectionExec: expr=[sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@3 as sum1, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@5 as sum2, sum(annotated_data_infinite2.a) PAR [...]
 02)--BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }) [...]
 03)----CoalesceBatchesExec: target_batch_size=4096
-04)------RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, 
preserve_order=true, sort_exprs={CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST,a@1 ASC NULLS LAST
-05)--------ProjectionExec: expr=[{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 as {CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}, a@1 as a, d@4 as d, 
sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, ann 
[...]
+04)------RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, 
preserve_order=true, sort_exprs=__common_expr_1@0 ASC NULLS LAST,a@1 ASC NULLS 
LAST
+05)--------ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1, a@1 as 
a, d@4 as d, sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@5 as sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AN [...]
 06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nul [...]
 07)------------CoalesceBatchesExec: target_batch_size=4096
-08)--------------RepartitionExec: partitioning=Hash([b@2, a@1], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
+08)--------------RepartitionExec: partitioning=Hash([b@2, a@1], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST,__common_expr_1@0 ASC NULLS LAST
 09)----------------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "sum(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int6 [...]
 10)------------------CoalesceBatchesExec: target_batch_size=4096
-11)--------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
+11)--------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC 
NULLS LAST,c@3 ASC NULLS LAST,__common_expr_1@0 ASC NULLS LAST
 12)----------------------BoundedWindowAggExec: 
wdw=[sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type [...]
 13)------------------------CoalesceBatchesExec: target_batch_size=4096
-14)--------------------------RepartitionExec: partitioning=Hash([a@1, b@2], 
2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 
ASC NULLS LAST,c@3 ASC NULLS LAST,{CAST(annotated_data_infinite2.a AS 
Int64)|{annotated_data_infinite2.a}}@0 ASC NULLS LAST
-15)----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
{CAST(annotated_data_infinite2.a AS Int64)|{annotated_data_infinite2.a}}, a@0 
as a, b@1 as b, c@2 as c, d@3 as d]
+14)--------------------------RepartitionExec: partitioning=Hash([a@1, b@2], 
2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 
ASC NULLS LAST,c@3 ASC NULLS LAST,__common_expr_1@0 ASC NULLS LAST
+15)----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as 
__common_expr_1, a@0 as a, b@1 as b, c@2 as c, d@3 as d]
 16)------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1
 17)--------------------------------StreamingTableExec: partition_sizes=1, 
projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to