This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 07804384cb Projection Expression - Input Field Inconsistencies during 
Projection (#10088)
07804384cb is described below

commit 07804384cbdcdd2861ec8a279632da32245e28f7
Author: Berkay Şahin <[email protected]>
AuthorDate: Mon Apr 22 22:35:48 2024 +0300

    Projection Expression - Input Field Inconsistencies during Projection 
(#10088)
    
    * agg fixes
    
    * test updates
    
    * fixing count mismatch
    
    * Update aggregate_statistics.rs
    
    * catch different names
    
    * minor
---
 .../src/physical_optimizer/aggregate_statistics.rs |  9 +--
 datafusion/core/src/physical_planner.rs            | 31 +++++---
 datafusion/functions-aggregate/src/first_last.rs   | 41 ++++++++---
 .../physical-expr/src/equivalence/projection.rs    |  6 +-
 .../test_files/agg_func_substitute.slt             |  8 +--
 datafusion/sqllogictest/test_files/aggregate.slt   | 12 ++--
 datafusion/sqllogictest/test_files/distinct_on.slt |  4 +-
 datafusion/sqllogictest/test_files/group_by.slt    | 82 +++++++++++-----------
 datafusion/sqllogictest/test_files/joins.slt       |  8 +--
 datafusion/sqllogictest/test_files/subquery.slt    | 26 +++++++
 10 files changed, 145 insertions(+), 82 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs 
b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index df54222270..98f8884e49 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -35,9 +35,6 @@ use 
datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
 #[derive(Default)]
 pub struct AggregateStatistics {}
 
-/// The name of the column corresponding to [`COUNT_STAR_EXPANSION`]
-const COUNT_STAR_NAME: &str = "COUNT(*)";
-
 impl AggregateStatistics {
     #[allow(missing_docs)]
     pub fn new() -> Self {
@@ -144,7 +141,7 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> 
Option<Arc<dyn ExecutionPlan>>
 fn take_optimizable_table_count(
     agg_expr: &dyn AggregateExpr,
     stats: &Statistics,
-) -> Option<(ScalarValue, &'static str)> {
+) -> Option<(ScalarValue, String)> {
     if let (&Precision::Exact(num_rows), Some(casted_expr)) = (
         &stats.num_rows,
         agg_expr.as_any().downcast_ref::<expressions::Count>(),
@@ -158,7 +155,7 @@ fn take_optimizable_table_count(
                 if lit_expr.value() == &COUNT_STAR_EXPANSION {
                     return Some((
                         ScalarValue::Int64(Some(num_rows as i64)),
-                        COUNT_STAR_NAME,
+                        casted_expr.name().to_owned(),
                     ));
                 }
             }
@@ -427,7 +424,7 @@ pub(crate) mod tests {
         /// What name would this aggregate produce in a plan?
         fn column_name(&self) -> &'static str {
             match self {
-                Self::CountStar => COUNT_STAR_NAME,
+                Self::CountStar => "COUNT(*)",
                 Self::ColumnA(_) => "COUNT(a)",
             }
         }
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 301f68c0f2..e6785b1dec 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -87,6 +87,7 @@ use datafusion_expr::expr::{
     WindowFunction,
 };
 use datafusion_expr::expr_rewriter::unnormalize_cols;
+use datafusion_expr::expr_vec_fmt;
 use 
datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
 use datafusion_expr::{
     DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery,
@@ -108,6 +109,7 @@ fn create_function_physical_name(
     fun: &str,
     distinct: bool,
     args: &[Expr],
+    order_by: Option<&Vec<Expr>>,
 ) -> Result<String> {
     let names: Vec<String> = args
         .iter()
@@ -118,7 +120,12 @@ fn create_function_physical_name(
         true => "DISTINCT ",
         false => "",
     };
-    Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
+
+    let phys_name = format!("{}({}{})", fun, distinct_str, names.join(","));
+
+    Ok(order_by
+        .map(|order_by| format!("{} ORDER BY [{}]", phys_name, 
expr_vec_fmt!(order_by)))
+        .unwrap_or(phys_name))
 }
 
 fn physical_name(e: &Expr) -> Result<String> {
@@ -238,22 +245,30 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> 
Result<String> {
                 return internal_err!("Function `Expr` with name should be 
resolved.");
             }
 
-            create_function_physical_name(fun.name(), false, &fun.args)
+            create_function_physical_name(fun.name(), false, &fun.args, None)
         }
-        Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
-            create_function_physical_name(&fun.to_string(), false, args)
+        Expr::WindowFunction(WindowFunction {
+            fun,
+            args,
+            order_by,
+            ..
+        }) => {
+            create_function_physical_name(&fun.to_string(), false, args, 
Some(order_by))
         }
         Expr::AggregateFunction(AggregateFunction {
             func_def,
             distinct,
             args,
             filter,
-            order_by: _,
+            order_by,
             null_treatment: _,
         }) => match func_def {
-            AggregateFunctionDefinition::BuiltIn(..) => {
-                create_function_physical_name(func_def.name(), *distinct, args)
-            }
+            AggregateFunctionDefinition::BuiltIn(..) => 
create_function_physical_name(
+                func_def.name(),
+                *distinct,
+                args,
+                order_by.as_ref(),
+            ),
             AggregateFunctionDefinition::UDF(fun) => {
                 // TODO: Add support for filter by in AggregateUDF
                 if filter.is_some() {
diff --git a/datafusion/functions-aggregate/src/first_last.rs 
b/datafusion/functions-aggregate/src/first_last.rs
index f8b388a4f3..8dc4cee87a 100644
--- a/datafusion/functions-aggregate/src/first_last.rs
+++ b/datafusion/functions-aggregate/src/first_last.rs
@@ -415,11 +415,9 @@ impl FirstValuePhysicalExpr {
     }
 
     pub fn convert_to_last(self) -> LastValuePhysicalExpr {
-        let name = if self.name.starts_with("FIRST") {
-            format!("LAST{}", &self.name[5..])
-        } else {
-            format!("LAST_VALUE({})", self.expr)
-        };
+        let mut name = format!("LAST{}", &self.name[5..]);
+        replace_order_by_clause(&mut name);
+
         let FirstValuePhysicalExpr {
             expr,
             input_data_type,
@@ -593,11 +591,9 @@ impl LastValuePhysicalExpr {
     }
 
     pub fn convert_to_first(self) -> FirstValuePhysicalExpr {
-        let name = if self.name.starts_with("LAST") {
-            format!("FIRST{}", &self.name[4..])
-        } else {
-            format!("FIRST_VALUE({})", self.expr)
-        };
+        let mut name = format!("FIRST{}", &self.name[4..]);
+        replace_order_by_clause(&mut name);
+
         let LastValuePhysicalExpr {
             expr,
             input_data_type,
@@ -905,6 +901,31 @@ fn convert_to_sort_cols(
         .collect::<Vec<_>>()
 }
 
+fn replace_order_by_clause(order_by: &mut String) {
+    let suffixes = [
+        (" DESC NULLS FIRST]", " ASC NULLS LAST]"),
+        (" ASC NULLS FIRST]", " DESC NULLS LAST]"),
+        (" DESC NULLS LAST]", " ASC NULLS FIRST]"),
+        (" ASC NULLS LAST]", " DESC NULLS FIRST]"),
+    ];
+
+    if let Some(start) = order_by.find("ORDER BY [") {
+        if let Some(end) = order_by[start..].find(']') {
+            let order_by_start = start + 9;
+            let order_by_end = start + end;
+
+            let column_order = &order_by[order_by_start..=order_by_end];
+            for &(suffix, replacement) in &suffixes {
+                if column_order.ends_with(suffix) {
+                    let new_order = column_order.replace(suffix, replacement);
+                    order_by.replace_range(order_by_start..=order_by_end, 
&new_order);
+                    break;
+                }
+            }
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use arrow::array::Int64Array;
diff --git a/datafusion/physical-expr/src/equivalence/projection.rs 
b/datafusion/physical-expr/src/equivalence/projection.rs
index 8c747ab8a2..18e350b097 100644
--- a/datafusion/physical-expr/src/equivalence/projection.rs
+++ b/datafusion/physical-expr/src/equivalence/projection.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
 use arrow::datatypes::SchemaRef;
 
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::Result;
+use datafusion_common::{internal_err, Result};
 
 use crate::expressions::Column;
 use crate::PhysicalExpr;
@@ -67,6 +67,10 @@ impl ProjectionMapping {
                             // Conceptually, `source_expr` and `expression` 
should be the same.
                             let idx = col.index();
                             let matching_input_field = input_schema.field(idx);
+                            if col.name() != matching_input_field.name() {
+                                return internal_err!("Input field name {} does 
not match with the projection expression {}",
+                                    matching_input_field.name(),col.name())
+                                }
                             let matching_input_column =
                                 Column::new(matching_input_field.name(), idx);
                             
Ok(Transformed::yes(Arc::new(matching_input_column)))
diff --git a/datafusion/sqllogictest/test_files/agg_func_substitute.slt 
b/datafusion/sqllogictest/test_files/agg_func_substitute.slt
index 811bfa864f..7beb20a521 100644
--- a/datafusion/sqllogictest/test_files/agg_func_substitute.slt
+++ b/datafusion/sqllogictest/test_files/agg_func_substitute.slt
@@ -44,11 +44,11 @@ logical_plan
 03)----TableScan: multiple_ordered_table projection=[a, c]
 physical_plan
 01)ProjectionExec: expr=[a@0 as a, 
NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c 
ASC NULLS LAST]@1 as result]
-02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
+02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
 03)----SortExec: expr=[a@0 ASC NULLS LAST]
 04)------CoalesceBatchesExec: target_batch_size=8192
 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
+06)----------AggregateExec: mode=Partial, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
 08)--------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], 
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
 
@@ -64,11 +64,11 @@ logical_plan
 03)----TableScan: multiple_ordered_table projection=[a, c]
 physical_plan
 01)ProjectionExec: expr=[a@0 as a, 
NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c 
ASC NULLS LAST]@1 as result]
-02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
+02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
 03)----SortExec: expr=[a@0 ASC NULLS LAST]
 04)------CoalesceBatchesExec: target_batch_size=8192
 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
+06)----------AggregateExec: mode=Partial, gby=[a@0 as a], 
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
 08)--------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], 
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
 
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index 5e4fb10456..8b5b84e766 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -132,9 +132,9 @@ logical_plan
 01)Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(agg_order.c1) ORDER BY 
[agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]]
 02)--TableScan: agg_order projection=[c1, c2, c3]
 physical_plan
-01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
+01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1) ORDER BY 
[agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
 02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
+03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1) 
ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
 04)------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 06)----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, 
projection=[c1, c2, c3], has_header=true
@@ -3520,9 +3520,9 @@ logical_plan
 01)Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(convert_first_last_table.c1) 
ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]]
 02)--TableScan: convert_first_last_table projection=[c1, c3]
 physical_plan
-01)AggregateExec: mode=Final, gby=[], 
aggr=[FIRST_VALUE(convert_first_last_table.c1)]
+01)AggregateExec: mode=Final, gby=[], 
aggr=[FIRST_VALUE(convert_first_last_table.c1) ORDER BY 
[convert_first_last_table.c3 DESC NULLS FIRST]]
 02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[], 
aggr=[LAST_VALUE(convert_first_last_table.c1)]
+03)----AggregateExec: mode=Partial, gby=[], 
aggr=[LAST_VALUE(convert_first_last_table.c1) ORDER BY 
[convert_first_last_table.c3 ASC NULLS LAST]]
 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 05)--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, 
projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS 
LAST]], has_header=true
 
@@ -3534,8 +3534,8 @@ logical_plan
 01)Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(convert_first_last_table.c1) 
ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]]
 02)--TableScan: convert_first_last_table projection=[c1, c2]
 physical_plan
-01)AggregateExec: mode=Final, gby=[], 
aggr=[LAST_VALUE(convert_first_last_table.c1)]
+01)AggregateExec: mode=Final, gby=[], 
aggr=[LAST_VALUE(convert_first_last_table.c1) ORDER BY 
[convert_first_last_table.c2 ASC NULLS LAST]]
 02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[], 
aggr=[FIRST_VALUE(convert_first_last_table.c1)]
+03)----AggregateExec: mode=Partial, gby=[], 
aggr=[FIRST_VALUE(convert_first_last_table.c1) ORDER BY 
[convert_first_last_table.c2 DESC NULLS FIRST]]
 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 05)--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, 
projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]], 
has_header=true
diff --git a/datafusion/sqllogictest/test_files/distinct_on.slt 
b/datafusion/sqllogictest/test_files/distinct_on.slt
index fc0df1d5f6..972c935cee 100644
--- a/datafusion/sqllogictest/test_files/distinct_on.slt
+++ b/datafusion/sqllogictest/test_files/distinct_on.slt
@@ -97,10 +97,10 @@ physical_plan
 01)ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY 
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@1 
as c3, FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC 
NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@2 as c2]
 02)--SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
 03)----SortExec: expr=[c1@0 ASC NULLS LAST]
-04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC 
NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], 
FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS 
LAST, aggregate_test_100.c3 ASC NULLS LAST]]
 05)--------CoalesceBatchesExec: target_batch_size=8192
 06)----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
+07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC 
NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], 
FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS 
LAST, aggregate_test_100.c3 ASC NULLS LAST]]
 08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
 09)----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c2, c3], has_header=true
 
diff --git a/datafusion/sqllogictest/test_files/group_by.slt 
b/datafusion/sqllogictest/test_files/group_by.slt
index 7bef738337..e015f7b01d 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -2016,10 +2016,10 @@ physical_plan
 01)SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
 02)--SortExec: expr=[col0@0 ASC NULLS LAST]
 03)----ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY 
[r.col0 ASC NULLS LAST]@3 as last_col1]
-04)------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as 
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as 
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS 
LAST]]
 05)--------CoalesceBatchesExec: target_batch_size=8192
 06)----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), 
input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as 
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
+07)------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as 
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS 
LAST]]
 08)--------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 
as col2, col0@0 as col0, col1@1 as col1]
 09)----------------CoalesceBatchesExec: target_batch_size=8192
 10)------------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(col0@0, col0@0)]
@@ -2145,7 +2145,7 @@ logical_plan
 03)----TableScan: annotated_data_infinite2 projection=[a, c, d]
 physical_plan
 01)ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c) 
ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as summation1]
-02)--AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], 
aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=PartiallySorted([1])
+02)--AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], 
aggr=[SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC 
NULLS FIRST]], ordering_mode=PartiallySorted([1])
 03)----StreamingTableExec: partition_sizes=1, projection=[a, c, d], 
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]
 
 query III
@@ -2178,7 +2178,7 @@ logical_plan
 03)----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 01)ProjectionExec: expr=[a@0 as a, b@1 as b, 
FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a 
DESC NULLS FIRST]@2 as first_c]
-02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[FIRST_VALUE(annotated_data_infinite2.c) ORDER BY 
[annotated_data_infinite2.a DESC NULLS FIRST]], ordering_mode=Sorted
 03)----StreamingTableExec: partition_sizes=1, projection=[a, b, c], 
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, 
c@2 ASC NULLS LAST]
 
 query III
@@ -2204,7 +2204,7 @@ logical_plan
 03)----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 01)ProjectionExec: expr=[a@0 as a, b@1 as b, 
LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a 
DESC NULLS FIRST]@2 as last_c]
-02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[LAST_VALUE(annotated_data_infinite2.c) ORDER BY 
[annotated_data_infinite2.a DESC NULLS FIRST]], ordering_mode=Sorted
 03)----StreamingTableExec: partition_sizes=1, projection=[a, b, c], 
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, 
c@2 ASC NULLS LAST]
 
 query III
@@ -2292,7 +2292,7 @@ logical_plan
 01)Aggregate: groupBy=[[annotated_data_infinite2.a, 
annotated_data_infinite2.b]], aggr=[[ARRAY_AGG(annotated_data_infinite2.d) 
ORDER BY [annotated_data_infinite2.d ASC NULLS LAST]]]
 02)--TableScan: annotated_data_infinite2 projection=[a, b, d]
 physical_plan
-01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[ARRAY_AGG(annotated_data_infinite2.d)], ordering_mode=Sorted
+01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[ARRAY_AGG(annotated_data_infinite2.d) ORDER BY 
[annotated_data_infinite2.d ASC NULLS LAST]], ordering_mode=Sorted
 02)--PartialSortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,d@2 ASC 
NULLS LAST], common_prefix_length=[2]
 03)----StreamingTableExec: partition_sizes=1, projection=[a, b, d], 
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]
 
@@ -2464,7 +2464,7 @@ logical_plan
 03)----TableScan: sales_global projection=[country, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
 03)----SortExec: expr=[amount@1 ASC NULLS LAST]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2494,7 +2494,7 @@ logical_plan
 04)------TableScan: sales_global projection=[country, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(s.amount), SUM(s.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]
 03)----SortExec: expr=[amount@1 DESC]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2538,7 +2538,7 @@ logical_plan
 05)--------TableScan: sales_global projection=[country, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)], 
ordering_mode=Sorted
 03)----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2574,7 +2574,7 @@ logical_plan
 05)--------TableScan: sales_global projection=[zip_code, country, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, zip_code@1 as zip_code, 
ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@2 as amounts, 
SUM(s.amount)@3 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as 
zip_code], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], 
ordering_mode=PartiallySorted([0])
+02)--AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as 
zip_code], aggr=[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], 
SUM(s.amount)], ordering_mode=PartiallySorted([0])
 03)----SortExec: expr=[country@1 ASC NULLS LAST,amount@2 DESC]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2610,7 +2610,7 @@ logical_plan
 05)--------TableScan: sales_global projection=[country, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY 
[s.country DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST], 
SUM(s.amount)], ordering_mode=Sorted
 03)----SortExec: expr=[country@0 ASC NULLS LAST]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2645,7 +2645,7 @@ logical_plan
 05)--------TableScan: sales_global projection=[country, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY 
[s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST]@1 as amounts, 
SUM(s.amount)@2 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC 
NULLS FIRST], SUM(s.amount)], ordering_mode=Sorted
 03)----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2677,7 +2677,7 @@ logical_plan
 03)----TableScan: sales_global projection=[country, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]]
 03)----SortExec: expr=[amount@1 DESC]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2708,7 +2708,7 @@ logical_plan
 03)----TableScan: sales_global projection=[country, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
FIRST_VALUE(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
 03)----SortExec: expr=[amount@1 ASC NULLS LAST]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2740,7 +2740,7 @@ logical_plan
 03)----TableScan: sales_global projection=[country, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST]@3 as amounts]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), 
ARRAY_AGG(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
 03)----SortExec: expr=[amount@1 ASC NULLS LAST]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2770,7 +2770,7 @@ logical_plan
 03)----TableScan: sales_global projection=[country, ts, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, SUM(sales_global.amount) ORDER 
BY [sales_global.ts DESC NULLS FIRST]@1 as sum1, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@2 as amounts]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[SUM(sales_global.amount), ARRAY_AGG(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], 
ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]
 03)----SortExec: expr=[amount@2 ASC NULLS LAST]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2805,7 +2805,7 @@ logical_plan
 04)------TableScan: sales_global projection=[country, ts, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 
as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]@3 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), 
SUM(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]
 03)----MemoryExec: partitions=1, partition_sizes=[1]
 
 query TRRR rowsort
@@ -2838,7 +2838,7 @@ logical_plan
 03)----TableScan: sales_global projection=[country, ts, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 
as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]@3 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), 
SUM(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]
 03)----MemoryExec: partitions=1, partition_sizes=[1]
 
 query TRRR rowsort
@@ -2874,7 +2874,7 @@ logical_plan
 physical_plan
 01)SortExec: expr=[sn@2 ASC NULLS LAST]
 02)--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 
as sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn 
ASC NULLS LAST]@5 as last_rate]
-03)----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, 
country@1 as country, ts@3 as ts, currency@4 as currency], 
aggr=[LAST_VALUE(e.amount)]
+03)----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, 
country@1 as country, ts@3 as ts, currency@4 as currency], 
aggr=[LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]]
 04)------ProjectionExec: expr=[zip_code@2 as zip_code, country@3 as country, 
sn@4 as sn, ts@5 as ts, currency@6 as currency, sn@0 as sn, amount@1 as amount]
 05)--------CoalesceBatchesExec: target_batch_size=8192
 06)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, 
currency@4)], filter=ts@0 >= ts@1, projection=[sn@0, amount@3, zip_code@4, 
country@5, sn@6, ts@7, currency@8]
@@ -2919,11 +2919,11 @@ physical_plan
 01)SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 02)--SortExec: expr=[country@0 ASC NULLS LAST]
 03)----ProjectionExec: expr=[country@0 as country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as 
fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST]@2 as fv2]
-04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST]]
 05)--------CoalesceBatchesExec: target_batch_size=8192
 06)----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
 07)------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
-08)--------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+08)--------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST]]
 09)----------------MemoryExec: partitions=1, partition_sizes=[1]
 
 query TRR
@@ -2955,11 +2955,11 @@ physical_plan
 01)SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 02)--SortExec: expr=[country@0 ASC NULLS LAST]
 03)----ProjectionExec: expr=[country@0 as country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as 
fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]@2 as fv2]
-04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]]
 05)--------CoalesceBatchesExec: target_batch_size=8192
 06)----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
 07)------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
-08)--------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+08)--------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]]
 09)----------------MemoryExec: partitions=1, partition_sizes=[1]
 
 query TRR
@@ -2991,9 +2991,9 @@ logical_plan
 03)----TableScan: sales_global projection=[ts, amount]
 physical_plan
 01)ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY 
[sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv2]
-02)--AggregateExec: mode=Final, gby=[], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+02)--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts ASC NULLS LAST]]
 03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+04)------AggregateExec: mode=Partial, gby=[], 
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST]]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
 06)----------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -3017,9 +3017,9 @@ logical_plan
 03)----TableScan: sales_global projection=[ts, amount]
 physical_plan
 01)ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY 
[sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv2]
-02)--AggregateExec: mode=Final, gby=[], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+02)--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST]]
 03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+04)------AggregateExec: mode=Partial, gby=[], 
aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
 06)----------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -3041,9 +3041,9 @@ logical_plan
 03)----TableScan: sales_global projection=[ts, amount]
 physical_plan
 01)ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.ts ASC NULLS LAST]@0 as array_agg1]
-02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)]
+02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.ts ASC NULLS LAST]]
 03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAY_AGG(sales_global.amount)]
+04)------AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]
 05)--------SortExec: expr=[ts@0 ASC NULLS LAST]
 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 07)------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3065,9 +3065,9 @@ logical_plan
 03)----TableScan: sales_global projection=[ts, amount]
 physical_plan
 01)ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.ts DESC NULLS FIRST]@0 as array_agg1]
-02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)]
+02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST]]
 03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAY_AGG(sales_global.amount)]
+04)------AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]]
 05)--------SortExec: expr=[ts@0 DESC]
 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 07)------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3089,9 +3089,9 @@ logical_plan
 03)----TableScan: sales_global projection=[amount]
 physical_plan
 01)ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST]@0 as array_agg1]
-02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)]
+02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]]
 03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAY_AGG(sales_global.amount)]
+04)------AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
 05)--------SortExec: expr=[amount@0 ASC NULLS LAST]
 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 07)------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3118,10 +3118,10 @@ physical_plan
 01)SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 02)--SortExec: expr=[country@0 ASC NULLS LAST]
 03)----ProjectionExec: expr=[country@0 as country, 
ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 
as array_agg1]
-04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
 05)--------CoalesceBatchesExec: target_batch_size=4
 06)----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
-07)------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount)]
+07)------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
 08)--------------SortExec: expr=[amount@1 ASC NULLS LAST]
 09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 10)------------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3154,10 +3154,10 @@ physical_plan
 01)SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 02)--SortExec: expr=[country@0 ASC NULLS LAST]
 03)----ProjectionExec: expr=[country@0 as country, 
ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
-04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC 
NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST]]
 05)--------CoalesceBatchesExec: target_batch_size=4
 06)----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
-07)------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
+07)------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]]
 08)--------------SortExec: expr=[amount@1 DESC]
 09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 10)------------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3800,10 +3800,10 @@ logical_plan
 03)----TableScan: multiple_ordered_table projection=[a, c, d]
 physical_plan
 01)ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY 
[multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, 
LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC 
NULLS FIRST]@2 as last_c]
-02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
LAST_VALUE(multiple_ordered_table.c)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a 
ASC NULLS LAST], LAST_VALUE(multiple_ordered_table.c) ORDER BY 
[multiple_ordered_table.c DESC NULLS FIRST]]
 03)----CoalesceBatchesExec: target_batch_size=2
 04)------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8
-05)--------AggregateExec: mode=Partial, gby=[d@2 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
FIRST_VALUE(multiple_ordered_table.c)]
+05)--------AggregateExec: mode=Partial, gby=[d@2 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a 
ASC NULLS LAST], FIRST_VALUE(multiple_ordered_table.c) ORDER BY 
[multiple_ordered_table.c ASC NULLS LAST]]
 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 07)------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, 
d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], 
has_header=true
 
@@ -3870,7 +3870,7 @@ logical_plan
 12)------------------TableScan: multiple_ordered_table projection=[a, d]
 physical_plan
 01)ProjectionExec: expr=[LAST_VALUE(l.d) ORDER BY [l.a ASC NULLS LAST]@1 as 
amount_usd]
-02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], 
aggr=[LAST_VALUE(l.d)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], aggr=[LAST_VALUE(l.d) 
ORDER BY [l.a ASC NULLS LAST]], ordering_mode=Sorted
 03)----CoalesceBatchesExec: target_batch_size=2
 04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d@1, d@1)], 
filter=CAST(a@0 AS Int64) >= CAST(a@1 AS Int64) - 10, projection=[a@0, d@1, 
row_n@4]
 05)--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], 
output_ordering=[a@0 ASC NULLS LAST], has_header=true
@@ -4974,7 +4974,7 @@ logical_plan
 02)--Aggregate: groupBy=[[multiple_ordered_table.a, 
multiple_ordered_table.b]], aggr=[[ARRAY_AGG(multiple_ordered_table.c) ORDER BY 
[multiple_ordered_table.c DESC NULLS FIRST]]]
 03)----TableScan: multiple_ordered_table projection=[a, b, c]
 physical_plan
-01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[ARRAY_AGG(multiple_ordered_table.c)], ordering_mode=Sorted
+01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[ARRAY_AGG(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c 
DESC NULLS FIRST]], ordering_mode=Sorted
 02)--CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS 
LAST]], has_header=true
 
 query II?
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 65e6c17b92..5ef33307b5 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3362,7 +3362,7 @@ logical_plan
 08)----------TableScan: annotated_data projection=[a, b]
 physical_plan
 01)ProjectionExec: expr=[a@0 as a, LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS 
FIRST]@3 as last_col1]
-02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b, c@2 as c], 
aggr=[LAST_VALUE(r.b)], ordering_mode=PartiallySorted([0])
+02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b, c@2 as c], 
aggr=[LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS FIRST]], 
ordering_mode=PartiallySorted([0])
 03)----CoalesceBatchesExec: target_batch_size=2
 04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
 05)--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], 
has_header=true
@@ -3410,7 +3410,7 @@ logical_plan
 12)------------------TableScan: multiple_ordered_table projection=[a, d]
 physical_plan
 01)ProjectionExec: expr=[LAST_VALUE(l.d) ORDER BY [l.a ASC NULLS LAST]@1 as 
amount_usd]
-02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], 
aggr=[LAST_VALUE(l.d)], ordering_mode=Sorted
+02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], aggr=[LAST_VALUE(l.d) 
ORDER BY [l.a ASC NULLS LAST]], ordering_mode=Sorted
 03)----CoalesceBatchesExec: target_batch_size=2
 04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d@1, d@1)], 
filter=CAST(a@0 AS Int64) >= CAST(a@1 AS Int64) - 10, projection=[a@0, d@1, 
row_n@4]
 05)--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], 
output_ordering=[a@0 ASC NULLS LAST], has_header=true
@@ -3447,10 +3447,10 @@ physical_plan
 01)SortPreservingMergeExec: [a@0 ASC]
 02)--SortExec: expr=[a@0 ASC]
 03)----ProjectionExec: expr=[a@0 as a, LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS 
FIRST]@3 as last_col1]
-04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as 
c], aggr=[LAST_VALUE(r.b)]
+04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as 
c], aggr=[LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS FIRST]]
 05)--------CoalesceBatchesExec: target_batch_size=2
 06)----------RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 2), 
input_partitions=2
-07)------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as 
c], aggr=[LAST_VALUE(r.b)]
+07)------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as 
c], aggr=[LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS FIRST]]
 08)--------------CoalesceBatchesExec: target_batch_size=2
 09)----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, 
a@0)]
 10)------------------CoalesceBatchesExec: target_batch_size=2
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index 155a176e85..7196418af1 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -728,6 +728,32 @@ logical_plan
 07)------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]
 08)--------TableScan: t2 projection=[]
 
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+query TT
+explain select (select count(*) from t1) as b, (select count(1) from t2)
+----
+logical_plan
+01)Projection: __scalar_sq_1.COUNT(*) AS b, __scalar_sq_2.COUNT(Int64(1)) AS 
COUNT(Int64(1))
+02)--Left Join: 
+03)----SubqueryAlias: __scalar_sq_1
+04)------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1)) AS COUNT(*)]]
+05)--------TableScan: t1 projection=[]
+06)----SubqueryAlias: __scalar_sq_2
+07)------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]
+08)--------TableScan: t2 projection=[]
+physical_plan
+01)ProjectionExec: expr=[COUNT(*)@0 as b, COUNT(Int64(1))@1 as COUNT(Int64(1))]
+02)--NestedLoopJoinExec: join_type=Left
+03)----ProjectionExec: expr=[4 as COUNT(*)]
+04)------PlaceholderRowExec
+05)----ProjectionExec: expr=[4 as COUNT(Int64(1))]
+06)------PlaceholderRowExec
+
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
 query II
 select (select count(*) from t1) as b, (select count(1) from t2)
 ----


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to