This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new da63d34112 Mismatch in MemTable of Select Into when projecting on 
aggregate window functions (#6566)
da63d34112 is described below

commit da63d34112b54070b5ec83c613da06ddb25e0a64
Author: Berkay Şahin <[email protected]>
AuthorDate: Wed Jul 5 21:50:51 2023 +0300

    Mismatch in MemTable of Select Into when projecting on aggregate window 
functions (#6566)
    
    * Schema check of partitions and input plan is removed for newly registered 
tables.
    
    * minor changes
    
    * In Select Into queries, aggregate windows are realiased with 
physical_name()
    
    * debugging
    
    * display_name() output is simplified for window functions
    
    * Windows are displayed in long format
    
    * Window names in tests are edited
    
    * Create table as test is added
    
    ---------
    
    Co-authored-by: Mustafa Akur <[email protected]>
---
 datafusion/core/src/physical_planner.rs            |   2 +-
 .../core/tests/sqllogictests/test_files/ddl.slt    |  30 +++-
 .../core/tests/sqllogictests/test_files/insert.slt |  10 +-
 .../core/tests/sqllogictests/test_files/window.slt | 194 ++++++++++----------
 .../user_defined/user_defined_window_functions.rs  | 196 ++++++++++-----------
 5 files changed, 230 insertions(+), 202 deletions(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index a4aab95635..f00f5e0d5e 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1597,7 +1597,7 @@ pub fn create_window_expr(
     // unpack aliased logical expressions, e.g. "sum(col) over () as total"
     let (name, e) = match e {
         Expr::Alias(Alias { expr, name, .. }) => (name.clone(), expr.as_ref()),
-        _ => (physical_name(e)?, e),
+        _ => (e.display_name()?, e),
     };
     create_window_expr_with_name(
         e,
diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt 
b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
index 1cf67be3a2..954bc4d991 100644
--- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
@@ -348,6 +348,9 @@ SELECT * FROM new_table;
 statement ok
 DROP TABLE new_table
 
+statement ok
+DROP TABLE my_table;
+
 # create_table_with_schema_as_multiple_values
 statement ok
 CREATE TABLE test_table(c1 int, c2 float, c3 varchar) AS VALUES(1, 2, 
'hello'),(2, 1, 'there'),(3, 0, '!');
@@ -362,7 +365,32 @@ SELECT * FROM new_table
 2 1 there
 
 statement ok
-DROP TABLE my_table;
+DROP TABLE new_table;
+
+# Select into without alias names of window aggregates
+statement ok
+SELECT SUM(c1) OVER(ORDER BY c2), c2, c3 INTO new_table FROM test_table
+
+query IRT
+SELECT * FROM new_table
+----
+3 0 !
+5 1 there
+6 2 hello
+
+statement ok
+DROP TABLE new_table;
+
+# Create table as without alias names of window aggregates
+statement ok
+CREATE TABLE new_table AS SELECT SUM(c1) OVER(ORDER BY c2), c2, c3 FROM 
test_table
+
+query IRT
+SELECT * FROM new_table
+----
+3 0 !
+5 1 there
+6 2 hello
 
 statement ok
 DROP TABLE new_table;
diff --git a/datafusion/core/tests/sqllogictests/test_files/insert.slt 
b/datafusion/core/tests/sqllogictests/test_files/insert.slt
index c710859a7b..faa519834c 100644
--- a/datafusion/core/tests/sqllogictests/test_files/insert.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/insert.slt
@@ -67,8 +67,8 @@ physical_plan
 InsertExec: sink=MemoryTable (partitions=1)
 --ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
 ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
-------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4), 
COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING@4 as COUNT(UInt8(1)), c1@0 as c1]
---------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { 
name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, 
COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Prec [...]
+------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION 
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT [...]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bou [...]
 ----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
 ------------CoalesceBatchesExec: target_batch_size=8192
 --------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
@@ -128,7 +128,7 @@ physical_plan
 InsertExec: sink=MemoryTable (partitions=1)
 --CoalescePartitionsExec
 ----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
-------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: 
"SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, 
COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preced [...]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound [...]
 --------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
 ----------CoalesceBatchesExec: target_batch_size=8192
 ------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
@@ -179,8 +179,8 @@ InsertExec: sink=MemoryTable (partitions=8)
 --ProjectionExec: expr=[a1@0 as a1, a2@1 as a2]
 ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
 ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]
---------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { 
name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, 
COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Prec [...]
-----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+ --------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bo [...]
+ ----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
 ------------CoalesceBatchesExec: target_batch_size=8192
 --------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
 ----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt 
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index d77df127a8..09339d7e49 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -359,7 +359,7 @@ SortPreservingMergeExec: [b@0 ASC NULLS LAST]
 --ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as 
MAX(d.seq)]
 ----AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)], 
ordering_mode=FullyOrdered
 ------ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as 
a, b@1 as b]
---------BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+--------BoundedWindowAggExec: wdw=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY 
[s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: 
Ok(Field { name: "ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: 
CurrentRow }], mode=[Sorted]
 ----------SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
 ------------CoalesceBatchesExec: target_batch_size=8192
 --------------RepartitionExec: partitioning=Hash([b@1], 4), input_partitions=4
@@ -1213,9 +1213,9 @@ Projection: aggregate_test_100.c9, 
SUM(aggregate_test_100.c9) ORDER BY [aggregat
 --------TableScan: aggregate_test_100 projection=[c8, c9]
 physical_plan
 ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]
---BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-----ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as 
SUM(aggregate_test_100.c9)]
-------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+--BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode= [...]
+----ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, 
aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, 
aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { uni [...]
 --------SortExec: expr=[c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]
 ----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c8, 
c9], has_header=true
 
@@ -1233,10 +1233,10 @@ Projection: aggregate_test_100.c2, 
MAX(aggregate_test_100.c9) ORDER BY [aggregat
 ------WindowAggr: windowExpr=[[MIN(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
 --------TableScan: aggregate_test_100 projection=[c2, c9]
 physical_plan
-ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as 
SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_t [...]
---WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
-----BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: 
"MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-------BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: 
"MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@3 as MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, 
SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@4 as SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING, MIN(aggregate_test_100.c9) O [...]
+--WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c9) 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }]
+----BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: "MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int8(NULL)), end_bound: CurrentRow }], mode= [...]
+------BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { uni [...]
 --------SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
 ----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c9], has_header=true
 
@@ -1257,11 +1257,11 @@ Sort: aggregate_test_100.c2 ASC NULLS LAST
 ----------TableScan: aggregate_test_100 projection=[c2, c9]
 physical_plan
 SortExec: expr=[c2@0 ASC NULLS LAST]
---ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as 
MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), 
MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECED [...]
-----WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
-------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: 
"MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+--ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as 
MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW, SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9) ROWS BET [...]
+----WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c9) 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }]
+------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { uni [...]
 --------SortExec: expr=[c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]
-----------BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { 
name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----------BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { [...]
 ------------SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
 --------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c9], has_header=true
 
@@ -1282,13 +1282,13 @@ Projection: SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggr
 ------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 --------TableScan: aggregate_test_100 projection=[c1, c2, c4]
 physical_plan
-ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@2 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as COUNT(UInt8(1))]
---BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
"COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], 
mode=[Sorted]
+ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@2 as 
SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEE [...]
+--BoundedWindowAggExec: wdw=[COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(UInt8(1)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), en [...]
 ----SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
 ------CoalesceBatchesExec: target_batch_size=4096
 --------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
-----------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, 
SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4)]
-------------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { 
name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], 
mode=[Sorted]
+----------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, 
SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION 
BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]
+------------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata:  [...]
 --------------SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
 ----------------CoalesceBatchesExec: target_batch_size=4096
 ------------------RepartitionExec: partitioning=Hash([c1@0, c2@1], 2), 
input_partitions=2
@@ -1315,8 +1315,8 @@ Projection: aggregate_test_100.c9, 
SUM(aggregate_test_100.c9) ORDER BY [aggregat
 physical_plan
 ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]
 --GlobalLimitExec: skip=0, fetch=5
-----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], 
mode=[Sorted]
-------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], 
mode=[Sorted]
+----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
 --------SortExec: expr=[c9@0 DESC]
 ----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
 
@@ -1356,8 +1356,8 @@ Projection: aggregate_test_100.c9, 
FIRST_VALUE(aggregate_test_100.c9) ORDER BY [
 physical_plan
 ProjectionExec: expr=[c9@0 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@4 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@1 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@5 as lag1, LAG(aggregate_test_100.c9,Int6 [...]
 --GlobalLimitExec: skip=0, fetch=5
-----BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { 
name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) 
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: 
"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, 
nullable: true, dict_id: 0, dict_is_orde [...]
-------BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field 
{ name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: 
"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, 
nullable: true, dict_id: 0, dict_is_or [...]
+----BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(5)), end_bound: Following(UInt64(1)) },  [...]
+------BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(5 [...]
 --------SortExec: expr=[c9@0 DESC]
 ----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
 
@@ -1399,9 +1399,9 @@ Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9
 physical_plan
 ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as rn1, ROW_NUMBER() 
ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING@1 as rn2]
 --GlobalLimitExec: skip=0, fetch=5
-----BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Ok(Field { name: 
"ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 
PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], 
mode=[Sorted]
 ------SortExec: expr=[c9@0 ASC NULLS LAST]
---------BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], 
mode=[Sorted]
+--------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Ok(Field { name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC 
NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: 
Following(UInt64(5)) }], mode=[Sorted]
 ----------SortExec: expr=[c9@0 DESC]
 ------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
 
@@ -1441,10 +1441,10 @@ Projection: aggregate_test_100.c9, 
SUM(aggregate_test_100.c9) ORDER BY [aggregat
 physical_plan
 ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, 
aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@5 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 
PRECEDING AND 5 FOLLOWING@3 as sum2, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDIN [...]
 --GlobalLimitExec: skip=0, fetch=5
-----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], 
mode=[Sorted]
+----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, 
aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: 
Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC 
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, 
nullable: true, dict_id: 0, dict_is_o [...]
 ------SortExec: expr=[c9@2 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS 
LAST]
---------BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], 
mode=[Sorted]
-----------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], 
mode=[Sorted]
+--------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Ok(Field { name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC 
NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: 
Following(UInt64(5)) }], mode=[Sorted]
+----------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Ro [...]
 ------------SortExec: expr=[c9@2 DESC,c1@0 DESC]
 --------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c2, c9], has_header=true
 
@@ -1523,19 +1523,19 @@ Projection: SUM(null_cases.c1) ORDER BY [null_cases.c3 
ASC NULLS LAST] RANGE BET
 physical_plan
 ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS 
LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@18 as a, SUM(null_cases.c1) 
ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING@18 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] 
RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@3 as c, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@11 
as d, SUM(null_cases.c1) O [...]
 --GlobalLimitExec: skip=0, fetch=5
-----WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, 
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), [...]
-------ProjectionExec: expr=[c1@0 as c1, c3@2 as c3, SUM(null_cases.c1) ORDER 
BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(null_cases.c1), SUM(null_cases.c1) 
ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING@4 as SUM(null_cases.c1), SUM(null_cases.c1) ORDER BY [null_cases.c3 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as 
SUM(null_cases.c1), SUM(null [...]
---------BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----WindowAggExec: wdw=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS 
LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING: Ok(Field { name: 
"SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 
PRECEDING AND 11 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NU [...]
+------ProjectionExec: expr=[c1@0 as c1, c3@2 as c3, SUM(null_cases.c1) ORDER 
BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 
DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as 
SUM(null_cases.c1) ORDER BY [null_cases.c3 [...]
+--------BoundedWindowAggExec: wdw=[SUM(null_cases.c1) ORDER BY [null_cases.c3 
ASC NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(null_cases.c1) ORDER BY [null_cases.c3 
ASC NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), [...]
 ----------SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]
-------------BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+------------BoundedWindowAggExec: wdw=[SUM(null_cases.c1) ORDER BY 
[null_cases.c3 ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(null_cases.c1) ORDER 
BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(N [...]
 --------------SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]
-----------------BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { 
name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----------------BoundedWindowAggExec: wdw=[SUM(null_cases.c1) ORDER BY 
[null_cases.c3 ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(null_cases.c1) ORDER 
BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(I [...]
 ------------------SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 DESC]
---------------------WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, 
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Current [...]
-----------------------WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, 
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Prece [...]
+--------------------WindowAggExec: wdw=[SUM(null_cases.c1) ORDER BY 
[null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING: 
Ok(Field { name: "SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] 
RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(11)), end_bound: 
Following(Int64(10)) }, SUM(null_cases.c1) ORDER BY [nu [...]
+----------------------WindowAggExec: wdw=[SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING: 
Ok(Field { name: "SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] 
RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(10)), end_bound: 
Following(Int64(11)) }, SUM(null_cases.c1) ORDER BY [ [...]
 ------------------------SortExec: expr=[c3@2 DESC NULLS LAST]
---------------------------WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { 
name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, 
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: P [...]
-----------------------------BoundedWindowAggExec: wdw=[SUM(null_cases.c1): 
Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
+--------------------------WindowAggExec: wdw=[SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING: 
Ok(Field { name: "SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] 
RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(10)), end_bound: 
Following(Int64(11)) }, SUM(null_cases.c1) ORDE [...]
+----------------------------BoundedWindowAggExec: wdw=[SUM(null_cases.c1) 
ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: [...]
 ------------------------------SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS LAST]
 --------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, projection=[c1, 
c2, c3], has_header=true
 
@@ -1609,8 +1609,8 @@ Projection: aggregate_test_100.c9, 
SUM(aggregate_test_100.c9) ORDER BY [aggregat
 physical_plan
 ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] 
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]
 --GlobalLimitExec: skip=0, fetch=5
-----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], 
mode=[Sorted]
-------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], 
mode=[Sorted]
+ ----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_b [...]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] 
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, 
aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, star [...]
 --------SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
 ----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c9], has_header=true
 
@@ -1653,8 +1653,8 @@ Projection: aggregate_test_100.c9, 
SUM(aggregate_test_100.c9) PARTITION BY [aggr
 physical_plan
 ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2]
 --GlobalLimitExec: skip=0, fetch=5
-----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], 
mode=[Sorted]
-------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], 
mode=[Sorted]
+----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: [...]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
 --------SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
 ----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c9], has_header=true
 
@@ -1698,9 +1698,9 @@ Projection: aggregate_test_100.c3, 
SUM(aggregate_test_100.c9) ORDER BY [aggregat
 physical_plan
 ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]
 --GlobalLimitExec: skip=0, fetch=5
-----WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: CurrentRow, end_bound: Following(Int16(NULL)) }]
-------ProjectionExec: expr=[c3@1 as c3, c4@2 as c4, c9@3 as c9, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@4 as SUM(aggregate_test_100.c9)]
---------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, me [...]
+------ProjectionExec: expr=[c3@1 as c3, c4@2 as c4, c9@3 as c9, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@4 as SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN U [...]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECED [...]
 ----------SortExec: expr=[c3@1 + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]
 ------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3, c4, c9], has_header=true
 
@@ -1794,13 +1794,13 @@ physical_plan
 GlobalLimitExec: skip=0, fetch=5
 --SortPreservingMergeExec: [c3@0 ASC NULLS LAST], fetch=5
 ----ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS 
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@2 as sum1, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum2]
-------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { u [...]
 --------SortExec: expr=[c3@0 ASC NULLS LAST,c9@1 DESC]
 ----------CoalesceBatchesExec: target_batch_size=4096
 ------------RepartitionExec: partitioning=Hash([c3@0], 2), input_partitions=2
 --------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-----------------ProjectionExec: expr=[c3@1 as c3, c9@2 as c9, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as 
SUM(aggregate_test_100.c9)]
-------------------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
+----------------ProjectionExec: expr=[c3@1 as c3, c9@2 as c9, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]
+------------------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS 
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS 
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: UInt6 [...]
 --------------------SortExec: expr=[c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]
 ----------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3, c9], has_header=true
 
@@ -1836,7 +1836,7 @@ Sort: aggregate_test_100.c1 ASC NULLS LAST
 physical_plan
 SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
 --ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@1 as rn1]
-----BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }], mode=[Sorted]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING: Ok(Field { name: "ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }], mode=[Sorted]
 ------SortExec: expr=[c1@0 ASC NULLS LAST]
 --------CoalesceBatchesExec: target_batch_size=4096
 ----------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
@@ -1963,7 +1963,7 @@ physical_plan
 SortExec: expr=[c1@0 ASC NULLS LAST]
 --CoalescePartitionsExec
 ----ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@1 as rn1]
-------BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }], 
mode=[Sorted]
+------BoundedWindowAggExec: wdw=[ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING: Ok(Field { name: "ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }], mode=[Sorted]
 --------SortExec: expr=[c1@0 ASC NULLS LAST]
 ----------CoalesceBatchesExec: target_batch_size=4096
 ------------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
@@ -1989,10 +1989,10 @@ Sort: aggregate_test_100.c1 ASC NULLS LAST
 physical_plan
 SortExec: expr=[c1@0 ASC NULLS LAST]
 --ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@3 as sum2]
-----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], 
mode=[Sorted]
+----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
 ------SortPreservingMergeExec: [c9@1 ASC NULLS LAST]
 --------SortExec: expr=[c9@1 ASC NULLS LAST]
-----------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }], 
mode=[Sorted]
+----------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 3 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
 ------------SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
 --------------CoalesceBatchesExec: target_batch_size=4096
 ----------------RepartitionExec: partitioning=Hash([c1@0], 2), 
input_partitions=2
@@ -2081,11 +2081,11 @@ physical_plan
 GlobalLimitExec: skip=0, fetch=5
 --SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
 ----ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as sum1, 
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, 
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum2, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1, aggregate_test_1 [...]
-------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], 
mode=[Sorted]
---------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c9@3 as c9, 
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, 
aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED 
FOLLOWING@4 as SUM(aggregate_test_100.c9), SUM(aggregate_test_100.c9) PARTITION 
BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLO 
[...]
-----------WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
-------------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], 
mode=[Sorted]
---------------WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, 
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }) [...]
+--------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c9@3 as c9, 
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, 
aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED 
FOLLOWING@4 as SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, 
aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING A [...]
+----------WindowAggExec: wdw=[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c9) PARTITION 
BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] 
ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING [...]
+------------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: [...]
+--------------WindowAggExec: wdw=[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c9) PARTITION 
BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] 
ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLO [...]
 ----------------SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 
ASC NULLS LAST,c8@2 ASC NULLS LAST]
 ------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c2, c8, c9], has_header=true
 
@@ -2137,13 +2137,13 @@ Projection: t1.c9, SUM(t1.c9) PARTITION BY [t1.c1, 
t1.c2] ORDER BY [t1.c9 ASC NU
 physical_plan
 ProjectionExec: expr=[c9@1 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER 
BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as sum1, 
SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] 
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum2, SUM(t1.c9) PARTITION BY 
[t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@3 as sum3, SUM(t1.c9) PARTITION BY 
[t1.c2, t1.c1_alias] ORDER BY [...]
 --GlobalLimitExec: skip=0, fetch=5
-----BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", 
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-------ProjectionExec: expr=[c2@0 as c2, c9@2 as c9, c1_alias@3 as c1_alias, 
SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@4 as 
SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as SUM(t1.c9), SUM(t1.c9) 
PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC 
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND  [...]
---------WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", 
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+----BoundedWindowAggExec: wdw=[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] 
ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: 
Ok(Field { name: "SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: 
Following(UInt64(5)) }], mode=[Sorted]
+------ProjectionExec: expr=[c2@0 as c2, c9@2 as c9, c1_alias@3 as c1_alias, 
SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@4 as 
SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING, SUM(t1.c9) 
PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 
PRECEDING AND 5 FOLLOWING@5 as SUM(t1.c9 [...]
+--------WindowAggExec: wdw=[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER 
BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 
UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(t1.c9) PARTITION BY [t1.c2, 
t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 
1 PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UI [...]
 ----------SortExec: expr=[c2@0 ASC NULLS LAST,c1_alias@3 ASC NULLS LAST,c9@2 
ASC NULLS LAST,c8@1 ASC NULLS LAST]
-------------ProjectionExec: expr=[c2@1 as c2, c8@2 as c8, c9@3 as c9, 
c1_alias@4 as c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 
ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED 
FOLLOWING@5 as SUM(t1.c9), SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY 
[t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as SUM(t1.c9)]
---------------BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: 
"SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
-----------------WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", 
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+------------ProjectionExec: expr=[c2@1 as c2, c8@2 as c8, c9@3 as c9, 
c1_alias@4 as c1_alias, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 
ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED 
FOLLOWING@5 as SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS 
LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING, 
SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOW [...]
+--------------BoundedWindowAggExec: wdw=[SUM(t1.c9) PARTITION BY [t1.c1, 
t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Ok(Field { name: "SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY 
[t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: 
UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: 
Following(UInt64(5)) }], mode=[Sorted]
+----------------WindowAggExec: wdw=[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] 
ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(t1.c9) PARTITION BY [t1.c1, 
t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 
PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64 [...]
 ------------------SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 
ASC NULLS LAST,c8@2 ASC NULLS LAST]
 --------------------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c8@2 as c8, 
c9@3 as c9, c1@0 as c1_alias]
 ----------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c2, c8, c9], has_header=true
@@ -2188,9 +2188,9 @@ ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2]
 --GlobalLimitExec: skip=0, fetch=5
 ----SortExec: fetch=5, expr=[c9@2 ASC NULLS LAST]
 ------ProjectionExec: expr=[SUM(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] 
GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as sum1, 
SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST] 
GROUPS BETWEEN 5 PRECEDING AND 3 PRECEDING@4 as sum2, c9@1 as c9]
---------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field { 
name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Groups, start_bound: Preceding(UInt64(5)), end_bound: 
Preceding(UInt64(3)) }], mode=[Sorted]
-----------ProjectionExec: expr=[c1@0 as c1, c9@2 as c9, c12@3 as c12, 
SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, 
aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 
FOLLOWING@4 as SUM(aggregate_test_100.c12)]
-------------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field { 
name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Groups, start_bound: Preceding(UInt64(1)), end_bound: 
Following(UInt64(1)) }], mode=[Sorted]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3 
PRECEDING: Ok(Field { name: "SUM(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c1 ASC NULLS LAST] GROUPS BETWEEN 5 PRECEDING AND 3 
PRECEDING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: 
Preceding(UInt64(5)), end_bound: Preceding(UInt64(3)) }], mo [...]
+----------ProjectionExec: expr=[c1@0 as c1, c9@2 as c9, c12@3 as c12, 
SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, 
aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 
FOLLOWING@4 as SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC 
NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING 
AND 1 FOLLOWING]
+------------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] 
GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, 
aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: G [...]
 --------------SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
 ----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c2, c9, c12], has_header=true
 
@@ -2226,7 +2226,7 @@ Limit: skip=0, fetch=5
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
 --ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
-----BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { 
name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
 ------SortExec: expr=[c9@0 ASC NULLS LAST]
 --------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
 
@@ -2265,7 +2265,7 @@ Limit: skip=0, fetch=5
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
 --ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
-----BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { 
name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
 ------SortExec: expr=[c9@0 DESC]
 --------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
 
@@ -2305,7 +2305,7 @@ physical_plan
 GlobalLimitExec: skip=0, fetch=5
 --SortExec: fetch=5, expr=[rn1@1 DESC]
 ----ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
-------BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { 
name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
 --------SortExec: expr=[c9@0 DESC]
 ----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
 
@@ -2348,7 +2348,7 @@ physical_plan
 GlobalLimitExec: skip=0, fetch=5
 --SortExec: fetch=5, expr=[rn1@1 ASC NULLS LAST,c9@0 ASC NULLS LAST]
 ----ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
-------BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { 
name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
 --------SortExec: expr=[c9@0 DESC]
 ----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
 
@@ -2401,7 +2401,7 @@ Limit: skip=0, fetch=5
 physical_plan
 GlobalLimitExec: skip=0, fetch=5
 --ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
-----BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { 
name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
 ------SortExec: expr=[c9@0 DESC]
 --------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
 
@@ -2532,10 +2532,10 @@ ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, 
sum3@2 as sum3, min1@3 as
 --GlobalLimitExec: skip=0, fetch=5
 ----SortExec: fetch=5, expr=[inc_col@24 DESC]
 ------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDER B [...]
---------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col): 
Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: 
Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, [...]
-----------ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as 
SUM(annotated_data_finite.inc_col), SUM(annotated_data_finite.desc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING@4 as SUM(annotated_data_finite.desc_col), 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts [...]
-------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): 
Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: 
Following(Int32(1)) }, SUM(annotated_data_finite.desc_col): Ok(Field { name: 
"SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata [...]
---------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): 
Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int32(4)), end_bound: 
Following(Int32(1)) }, SUM(annotated_data_finite.desc_col): Ok(Field { name: 
"SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadat [...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), 
end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING AND 
1 FOLLOWING: Ok(Field { name: "COUNT(UInt8(1)) [...]
+----------ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as 
SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, 
SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC 
NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as SUM(annotate [...]
+------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(10)), end_bound: Follow [...]
+--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(4)), end_bound: Fol [...]
 ----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIII
@@ -2620,8 +2620,8 @@ ProjectionExec: expr=[fv1@0 as fv1, fv2@1 as fv2, lv1@2 
as lv1, lv2@3 as lv2, nv
 --GlobalLimitExec: skip=0, fetch=5
 ----SortExec: fetch=5, expr=[ts@24 DESC]
 ------ProjectionExec: expr=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@10 as fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING@11 as fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@12 as lv1, LAST_VALUE(annotated_d [...]
---------BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): 
Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: 
Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), 
end_bound: Following(Int32(1)) }, FIRST_VALUE(annotated_data_finite.inc_col): 
Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: 
Int32, nullable: true, dict_id: 0, dict_i [...]
-----------BoundedWindowAggExec: 
wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: 
"FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) 
}, FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: 
"FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, 
dict_id: 0, dict [...]
+--------BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(10)), end_b [...]
+----------BoundedWindowAggExec: 
wdw=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(1)),  [...]
 ------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIIIIIIIIIIIIIIIII
@@ -2692,8 +2692,8 @@ ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, 
min1@2 as min1, min2@3 as
 --GlobalLimitExec: skip=0, fetch=5
 ----SortExec: fetch=5, expr=[inc_col@10 ASC NULLS LAST]
 ------ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@7 as sum1, SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@2 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@8 as min1, MIN(annotated_data_fi [...]
---------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): 
Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: 
Following(Int32(5)) }, MIN(annotated_data_finite.inc_col): Ok(Field { name: 
"MIN(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} [...]
-----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): 
Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: 
Following(Int32(3)) }, MIN(annotated_data_finite.inc_col): Ok(Field { name: 
"MIN(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata:  [...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(NULL)), end_b [...]
+----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(NULL)), [...]
 ------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIIIIIIRR
@@ -2745,8 +2745,8 @@ ProjectionExec: expr=[first_value1@0 as first_value1, 
first_value2@1 as first_va
 --GlobalLimitExec: skip=0, fetch=5
 ----SortExec: fetch=5, expr=[inc_col@5 ASC NULLS LAST]
 ------ProjectionExec: expr=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@4 as first_value1, FIRST_VALUE(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING@2 as first_value2, 
LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOW [...]
---------BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): 
Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: 
Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: Following(UInt64(1)) }, LAST_VALUE(annotated_data_finite.inc_col): 
Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, 
nullable: true, dict_id: 0, dict_ [...]
-----------BoundedWindowAggExec: 
wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: 
"FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(3)) }, LAST_VALUE(annotated_data_finite.inc_col): Ok(Field { 
name: "LAST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: 
true, dict_id: 0, dic [...]
+--------BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED 
PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: 
Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64 [...]
+----------BoundedWindowAggExec: 
wdw=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING: Ok(Field { name: 
"FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts 
DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: 
Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Rows, start_bound: Preceding( [...]
 ------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
 query IIIII
@@ -2790,8 +2790,8 @@ physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
 --GlobalLimitExec: skip=0, fetch=5
 ----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
-------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): 
Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(1)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { 
name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, m [...]
---------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): 
Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(3)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { 
name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, [...]
+------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), e [...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NUL [...]
 ----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], 
has_header=true
 
 
@@ -2836,8 +2836,8 @@ physical_plan
 ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
 --GlobalLimitExec: skip=0, fetch=5
 ----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
-------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): 
Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(1)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { 
name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, m [...]
---------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): 
Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(3)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { 
name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, [...]
+------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), e [...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NUL [...]
 ----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], 
has_header=true
 
 
@@ -2934,12 +2934,12 @@ Projection: annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_da
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, 
SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@8 as sum1, SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULL 
[...]
 --GlobalLimitExec: skip=0, fetch=5
-----BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { 
name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) 
}, SUM(annotated_data_infinite2.c): Ok(Field { name: 
"SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: Window [...]
-------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { 
name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) 
}, SUM(annotated_data_infinite2.c): Ok(Field { name: 
"SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: Wind [...]
---------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { 
name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) 
}, SUM(annotated_data_infinite2.c): Ok(Field { name: 
"SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: Wi [...]
-----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field 
{ name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) 
}, SUM(annotated_data_infinite2.c): Ok(Field { name: 
"SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame:  [...]
-------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): 
Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) 
}, SUM(annotated_data_infinite2.c): Ok(Field { name: 
"SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame [...]
---------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): 
Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) 
}, SUM(annotated_data_infinite2.c): Ok(Field { name: 
"SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), fra [...]
+----BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b 
ASC NULLS LAST, annotated_data_inf [...]
+------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, 
annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FO 
[...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0 [...]
+----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC 
NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST, annotated_data_infinite2.c ASC NULLS LAST] [...]
+------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AN [...]
+--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict [...]
 ----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST], has_header=true
 
 
@@ -3003,17 +3003,17 @@ physical_plan
 GlobalLimitExec: skip=0, fetch=5
 --SortExec: fetch=5, expr=[c@2 ASC NULLS LAST]
 ----ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, 
SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING@8 as sum1, SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BET [...]
-------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { 
name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) 
}, SUM(annotated_data_finite2.c): Ok(Field { name: 
"SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame  [...]
+------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, 
annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS 
LAST, annotated_data_finite2.c ASC NULLS [...]
 --------SortExec: expr=[d@3 ASC NULLS LAST,a@0 ASC NULLS LAST,b@1 ASC NULLS 
LAST,c@2 ASC NULLS LAST]
-----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { 
name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) 
}, SUM(annotated_data_finite2.c): Ok(Field { name: 
"SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFr [...]
+----------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) PARTITION 
BY [annotated_data_finite2.b, annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, 
annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_t [...]
 ------------SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,d@3 ASC 
NULLS LAST,c@2 ASC NULLS LAST]
---------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): 
Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) 
}, SUM(annotated_data_finite2.c): Ok(Field { name: 
"SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: Wind [...]
+--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "SUM(annotated_data_finite2.c) PARTITION BY 
[annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ [...]
 ----------------SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,c@2 ASC 
NULLS LAST]
-------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): 
Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) 
}, SUM(annotated_data_finite2.c): Ok(Field { name: 
"SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame:  [...]
+------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS 
LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, 
annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEE [...]
 --------------------SortExec: expr=[a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,b@1 
ASC NULLS LAST,c@2 ASC NULLS LAST]
-----------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: 
"SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, 
SUM(annotated_data_finite2.c): Ok(Field { name: 
"SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), fra [...]
+----------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c) 
PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, 
annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY 
[annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOW 
[...]
 ------------------------SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS 
LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]
---------------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: 
"SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, 
SUM(annotated_data_finite2.c): Ok(Field { name: 
"SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), [...]
+--------------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, 
annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] 
ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, 
dict_id: [...]
 ----------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS 
LAST], has_header=true
 
 
@@ -3105,10 +3105,10 @@ ProjectionExec: expr=[min1@0 as min1, max1@1 as max1]
 --GlobalLimitExec: skip=0, fetch=5
 ----SortExec: fetch=5, expr=[c3@2 ASC NULLS LAST]
 ------ProjectionExec: expr=[MAX(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@3 as min1, MIN(aggregate_test_100.c12) PARTITION BY 
[aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@2 as max1, c3@0 as c3]
---------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c12): Ok(Field { 
name: "MAX(aggregate_test_100.c12)", data_type: Float64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Float64(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
+--------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: "MAX(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Float64(NULL)), end_bound: CurrentR [...]
 ----------SortExec: expr=[c12@1 ASC NULLS LAST]
-------------ProjectionExec: expr=[c3@0 as c3, c12@2 as c12, 
MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@3 as MIN(aggregate_test_100.c12)]
---------------WindowAggExec: wdw=[MIN(aggregate_test_100.c12): Ok(Field { 
name: "MIN(aggregate_test_100.c12)", data_type: Float64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }]
+------------ProjectionExec: expr=[c3@0 as c3, c12@2 as c12, 
MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@3 as MIN(aggregate_test_100.c12) 
PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING]
+--------------WindowAggExec: wdw=[MIN(aggregate_test_100.c12) PARTITION BY 
[aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING: Ok(Field { name: "MIN(aggregate_test_100.c12) PARTITION BY 
[aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
 ----------------SortExec: expr=[c11@1 ASC NULLS LAST]
 ------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3, 
c11, c12], has_header=true
 
@@ -3150,7 +3150,7 @@ ProjectionExec: expr=[min1@0 as min1, max1@1 as max1]
 --GlobalLimitExec: skip=0, fetch=5
 ----SortExec: fetch=5, expr=[c3@2 ASC NULLS LAST]
 ------ProjectionExec: expr=[MAX(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@2 as min1, MIN(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@3 as max1, c3@0 as c3]
---------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c12): Ok(Field { 
name: "MAX(aggregate_test_100.c12)", data_type: Float64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Float64(NULL)), end_bound: CurrentRow }, 
MIN(aggregate_test_100.c12): Ok(Field { name: "MIN(aggregate_test_100.c12)", 
data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units [...]
+--------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: "MAX(aggregate_test_100.c12) ORDER BY 
[aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Float64(NULL)), end_bound: CurrentR [...]
 ----------SortExec: expr=[c12@1 ASC NULLS LAST]
 ------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3, 
c12], has_header=true
 
diff --git 
a/datafusion/core/tests/user_defined/user_defined_window_functions.rs 
b/datafusion/core/tests/user_defined/user_defined_window_functions.rs
index 946aa31172..dfa1781285 100644
--- a/datafusion/core/tests/user_defined/user_defined_window_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_window_functions.rs
@@ -80,20 +80,20 @@ async fn test_udwf() {
     let TestContext { ctx, test_state } = TestContext::new(test_state);
 
     let expected = vec![
-        "+---+---+-----+--------------------+",
-        "| x | y | val | odd_counter(t.val) |",
-        "+---+---+-----+--------------------+",
-        "| 1 | a | 0   | 1                  |",
-        "| 1 | b | 1   | 1                  |",
-        "| 1 | c | 2   | 1                  |",
-        "| 2 | d | 3   | 2                  |",
-        "| 2 | e | 4   | 2                  |",
-        "| 2 | f | 5   | 2                  |",
-        "| 2 | g | 6   | 2                  |",
-        "| 2 | h | 6   | 2                  |",
-        "| 2 | i | 6   | 2                  |",
-        "| 2 | j | 6   | 2                  |",
-        "+---+---+-----+--------------------+",
+    
"+---+---+-----+-----------------------------------------------------------------------------------------------------------------------+",
+    "| x | y | val | odd_counter(t.val) PARTITION BY [t.x] ORDER BY [t.y ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW |",
+    
"+---+---+-----+-----------------------------------------------------------------------------------------------------------------------+",
+    "| 1 | a | 0   | 1                                                         
                                                            |",
+    "| 1 | b | 1   | 1                                                         
                                                            |",
+    "| 1 | c | 2   | 1                                                         
                                                            |",
+    "| 2 | d | 3   | 2                                                         
                                                            |",
+    "| 2 | e | 4   | 2                                                         
                                                            |",
+    "| 2 | f | 5   | 2                                                         
                                                            |",
+    "| 2 | g | 6   | 2                                                         
                                                            |",
+    "| 2 | h | 6   | 2                                                         
                                                            |",
+    "| 2 | i | 6   | 2                                                         
                                                            |",
+    "| 2 | j | 6   | 2                                                         
                                                            |",
+    
"+---+---+-----+-----------------------------------------------------------------------------------------------------------------------+",
     ];
     assert_batches_eq!(
         expected,
@@ -111,20 +111,20 @@ async fn test_udwf_bounded_window_ignores_frame() {
 
     // Since the UDWF doesn't say it needs the window frame, the frame is 
ignored
     let expected = vec![
-        "+---+---+-----+--------------------+",
-        "| x | y | val | odd_counter(t.val) |",
-        "+---+---+-----+--------------------+",
-        "| 1 | a | 0   | 1                  |",
-        "| 1 | b | 1   | 1                  |",
-        "| 1 | c | 2   | 1                  |",
-        "| 2 | d | 3   | 2                  |",
-        "| 2 | e | 4   | 2                  |",
-        "| 2 | f | 5   | 2                  |",
-        "| 2 | g | 6   | 2                  |",
-        "| 2 | h | 6   | 2                  |",
-        "| 2 | i | 6   | 2                  |",
-        "| 2 | j | 6   | 2                  |",
-        "+---+---+-----+--------------------+",
+    
"+---+---+-----+--------------------------------------------------------------------------------------------------------------+",
+    "| x | y | val | odd_counter(t.val) PARTITION BY [t.x] ORDER BY [t.y ASC 
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING |",
+    
"+---+---+-----+--------------------------------------------------------------------------------------------------------------+",
+    "| 1 | a | 0   | 1                                                         
                                                   |",
+    "| 1 | b | 1   | 1                                                         
                                                   |",
+    "| 1 | c | 2   | 1                                                         
                                                   |",
+    "| 2 | d | 3   | 2                                                         
                                                   |",
+    "| 2 | e | 4   | 2                                                         
                                                   |",
+    "| 2 | f | 5   | 2                                                         
                                                   |",
+    "| 2 | g | 6   | 2                                                         
                                                   |",
+    "| 2 | h | 6   | 2                                                         
                                                   |",
+    "| 2 | i | 6   | 2                                                         
                                                   |",
+    "| 2 | j | 6   | 2                                                         
                                                   |",
+    
"+---+---+-----+--------------------------------------------------------------------------------------------------------------+",
     ];
     assert_batches_eq!(
         expected,
@@ -142,20 +142,20 @@ async fn test_udwf_bounded_window() {
     let TestContext { ctx, test_state } = TestContext::new(test_state);
 
     let expected = vec![
-        "+---+---+-----+--------------------+",
-        "| x | y | val | odd_counter(t.val) |",
-        "+---+---+-----+--------------------+",
-        "| 1 | a | 0   | 1                  |",
-        "| 1 | b | 1   | 1                  |",
-        "| 1 | c | 2   | 1                  |",
-        "| 2 | d | 3   | 1                  |",
-        "| 2 | e | 4   | 2                  |",
-        "| 2 | f | 5   | 1                  |",
-        "| 2 | g | 6   | 1                  |",
-        "| 2 | h | 6   | 0                  |",
-        "| 2 | i | 6   | 0                  |",
-        "| 2 | j | 6   | 0                  |",
-        "+---+---+-----+--------------------+",
+    
"+---+---+-----+--------------------------------------------------------------------------------------------------------------+",
+    "| x | y | val | odd_counter(t.val) PARTITION BY [t.x] ORDER BY [t.y ASC 
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING |",
+    
"+---+---+-----+--------------------------------------------------------------------------------------------------------------+",
+    "| 1 | a | 0   | 1                                                         
                                                   |",
+    "| 1 | b | 1   | 1                                                         
                                                   |",
+    "| 1 | c | 2   | 1                                                         
                                                   |",
+    "| 2 | d | 3   | 1                                                         
                                                   |",
+    "| 2 | e | 4   | 2                                                         
                                                   |",
+    "| 2 | f | 5   | 1                                                         
                                                   |",
+    "| 2 | g | 6   | 1                                                         
                                                   |",
+    "| 2 | h | 6   | 0                                                         
                                                   |",
+    "| 2 | i | 6   | 0                                                         
                                                   |",
+    "| 2 | j | 6   | 0                                                         
                                                   |",
+    
"+---+---+-----+--------------------------------------------------------------------------------------------------------------+",
     ];
     assert_batches_eq!(
         expected,
@@ -175,20 +175,20 @@ async fn test_stateful_udwf() {
     let TestContext { ctx, test_state } = TestContext::new(test_state);
 
     let expected = vec![
-        "+---+---+-----+--------------------+",
-        "| x | y | val | odd_counter(t.val) |",
-        "+---+---+-----+--------------------+",
-        "| 1 | a | 0   | 0                  |",
-        "| 1 | b | 1   | 1                  |",
-        "| 1 | c | 2   | 1                  |",
-        "| 2 | d | 3   | 1                  |",
-        "| 2 | e | 4   | 1                  |",
-        "| 2 | f | 5   | 2                  |",
-        "| 2 | g | 6   | 2                  |",
-        "| 2 | h | 6   | 2                  |",
-        "| 2 | i | 6   | 2                  |",
-        "| 2 | j | 6   | 2                  |",
-        "+---+---+-----+--------------------+",
+    
"+---+---+-----+-----------------------------------------------------------------------------------------------------------------------+",
+    "| x | y | val | odd_counter(t.val) PARTITION BY [t.x] ORDER BY [t.y ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW |",
+    
"+---+---+-----+-----------------------------------------------------------------------------------------------------------------------+",
+    "| 1 | a | 0   | 0                                                         
                                                            |",
+    "| 1 | b | 1   | 1                                                         
                                                            |",
+    "| 1 | c | 2   | 1                                                         
                                                            |",
+    "| 2 | d | 3   | 1                                                         
                                                            |",
+    "| 2 | e | 4   | 1                                                         
                                                            |",
+    "| 2 | f | 5   | 2                                                         
                                                            |",
+    "| 2 | g | 6   | 2                                                         
                                                            |",
+    "| 2 | h | 6   | 2                                                         
                                                            |",
+    "| 2 | i | 6   | 2                                                         
                                                            |",
+    "| 2 | j | 6   | 2                                                         
                                                            |",
+    
"+---+---+-----+-----------------------------------------------------------------------------------------------------------------------+",
     ];
     assert_batches_eq!(
         expected,
@@ -208,20 +208,20 @@ async fn test_stateful_udwf_bounded_window() {
     let TestContext { ctx, test_state } = TestContext::new(test_state);
 
     let expected = vec![
-        "+---+---+-----+--------------------+",
-        "| x | y | val | odd_counter(t.val) |",
-        "+---+---+-----+--------------------+",
-        "| 1 | a | 0   | 1                  |",
-        "| 1 | b | 1   | 1                  |",
-        "| 1 | c | 2   | 1                  |",
-        "| 2 | d | 3   | 1                  |",
-        "| 2 | e | 4   | 2                  |",
-        "| 2 | f | 5   | 1                  |",
-        "| 2 | g | 6   | 1                  |",
-        "| 2 | h | 6   | 0                  |",
-        "| 2 | i | 6   | 0                  |",
-        "| 2 | j | 6   | 0                  |",
-        "+---+---+-----+--------------------+",
+    
"+---+---+-----+--------------------------------------------------------------------------------------------------------------+",
+    "| x | y | val | odd_counter(t.val) PARTITION BY [t.x] ORDER BY [t.y ASC 
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING |",
+    
"+---+---+-----+--------------------------------------------------------------------------------------------------------------+",
+    "| 1 | a | 0   | 1                                                         
                                                   |",
+    "| 1 | b | 1   | 1                                                         
                                                   |",
+    "| 1 | c | 2   | 1                                                         
                                                   |",
+    "| 2 | d | 3   | 1                                                         
                                                   |",
+    "| 2 | e | 4   | 2                                                         
                                                   |",
+    "| 2 | f | 5   | 1                                                         
                                                   |",
+    "| 2 | g | 6   | 1                                                         
                                                   |",
+    "| 2 | h | 6   | 0                                                         
                                                   |",
+    "| 2 | i | 6   | 0                                                         
                                                   |",
+    "| 2 | j | 6   | 0                                                         
                                                   |",
+    
"+---+---+-----+--------------------------------------------------------------------------------------------------------------+",
     ];
     assert_batches_eq!(
         expected,
@@ -240,20 +240,20 @@ async fn test_udwf_query_include_rank() {
     let TestContext { ctx, test_state } = TestContext::new(test_state);
 
     let expected = vec![
-        "+---+---+-----+--------------------+",
-        "| x | y | val | odd_counter(t.val) |",
-        "+---+---+-----+--------------------+",
-        "| 1 | a | 0   | 3                  |",
-        "| 1 | b | 1   | 2                  |",
-        "| 1 | c | 2   | 1                  |",
-        "| 2 | d | 3   | 7                  |",
-        "| 2 | e | 4   | 6                  |",
-        "| 2 | f | 5   | 5                  |",
-        "| 2 | g | 6   | 4                  |",
-        "| 2 | h | 6   | 3                  |",
-        "| 2 | i | 6   | 2                  |",
-        "| 2 | j | 6   | 1                  |",
-        "+---+---+-----+--------------------+",
+    
"+---+---+-----+-----------------------------------------------------------------------------------------------------------------------+",
+    "| x | y | val | odd_counter(t.val) PARTITION BY [t.x] ORDER BY [t.y ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW |",
+    
"+---+---+-----+-----------------------------------------------------------------------------------------------------------------------+",
+    "| 1 | a | 0   | 3                                                         
                                                            |",
+    "| 1 | b | 1   | 2                                                         
                                                            |",
+    "| 1 | c | 2   | 1                                                         
                                                            |",
+    "| 2 | d | 3   | 7                                                         
                                                            |",
+    "| 2 | e | 4   | 6                                                         
                                                            |",
+    "| 2 | f | 5   | 5                                                         
                                                            |",
+    "| 2 | g | 6   | 4                                                         
                                                            |",
+    "| 2 | h | 6   | 3                                                         
                                                            |",
+    "| 2 | i | 6   | 2                                                         
                                                            |",
+    "| 2 | j | 6   | 1                                                         
                                                            |",
+    
"+---+---+-----+-----------------------------------------------------------------------------------------------------------------------+",
     ];
     assert_batches_eq!(
         expected,
@@ -272,20 +272,20 @@ async fn test_udwf_bounded_query_include_rank() {
     let TestContext { ctx, test_state } = TestContext::new(test_state);
 
     let expected = vec![
-        "+---+---+-----+--------------------+",
-        "| x | y | val | odd_counter(t.val) |",
-        "+---+---+-----+--------------------+",
-        "| 1 | a | 0   | 3                  |",
-        "| 1 | b | 1   | 2                  |",
-        "| 1 | c | 2   | 1                  |",
-        "| 2 | d | 3   | 7                  |",
-        "| 2 | e | 4   | 6                  |",
-        "| 2 | f | 5   | 5                  |",
-        "| 2 | g | 6   | 4                  |",
-        "| 2 | h | 6   | 3                  |",
-        "| 2 | i | 6   | 2                  |",
-        "| 2 | j | 6   | 1                  |",
-        "+---+---+-----+--------------------+",
+    
"+---+---+-----+--------------------------------------------------------------------------------------------------------------+",
+    "| x | y | val | odd_counter(t.val) PARTITION BY [t.x] ORDER BY [t.y ASC 
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING |",
+    
"+---+---+-----+--------------------------------------------------------------------------------------------------------------+",
+    "| 1 | a | 0   | 3                                                         
                                                   |",
+    "| 1 | b | 1   | 2                                                         
                                                   |",
+    "| 1 | c | 2   | 1                                                         
                                                   |",
+    "| 2 | d | 3   | 7                                                         
                                                   |",
+    "| 2 | e | 4   | 6                                                         
                                                   |",
+    "| 2 | f | 5   | 5                                                         
                                                   |",
+    "| 2 | g | 6   | 4                                                         
                                                   |",
+    "| 2 | h | 6   | 3                                                         
                                                   |",
+    "| 2 | i | 6   | 2                                                         
                                                   |",
+    "| 2 | j | 6   | 1                                                         
                                                   |",
+    
"+---+---+-----+--------------------------------------------------------------------------------------------------------------+",
     ];
     assert_batches_eq!(
         expected,


Reply via email to