This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fc8e7b9035 Remove ArrayAgg Builtin in favor of UDF (#11611)
fc8e7b9035 is described below

commit fc8e7b90356b94af5f591240b8165bc4c8275a51
Author: Jay Zhan <[email protected]>
AuthorDate: Wed Jul 24 04:52:45 2024 +0800

    Remove ArrayAgg Builtin in favor of UDF (#11611)
    
    * rm def
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rewrite test
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * fix
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    ---------
    
    Signed-off-by: jayzhan211 <[email protected]>
---
 datafusion/core/tests/dataframe/mod.rs             |   4 +-
 datafusion/core/tests/sql/aggregates.rs            |   2 +-
 datafusion/expr/src/aggregate_function.rs          |  16 +--
 datafusion/expr/src/type_coercion/aggregates.rs    |   7 +-
 datafusion/expr/src/udaf.rs                        |   2 +-
 datafusion/functions-aggregate/src/array_agg.rs    |   7 +-
 datafusion/functions-array/src/planner.rs          |   2 +-
 .../physical-expr-common/src/aggregate/mod.rs      |   5 +-
 datafusion/physical-expr/src/aggregate/build_in.rs |   4 +-
 datafusion/proto/proto/datafusion.proto            |   2 +-
 datafusion/proto/src/generated/pbjson.rs           |   3 -
 datafusion/proto/src/generated/prost.rs            |   7 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |   1 -
 datafusion/proto/src/logical_plan/to_proto.rs      |   2 -
 datafusion/sqllogictest/test_files/aggregate.slt   |  16 +--
 datafusion/sqllogictest/test_files/binary_view.slt |   2 +-
 datafusion/sqllogictest/test_files/group_by.slt    | 138 ++++++++++-----------
 datafusion/sqllogictest/test_files/window.slt      |  10 +-
 18 files changed, 100 insertions(+), 130 deletions(-)

diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index d68b806919..bc01ada1e0 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -1389,7 +1389,7 @@ async fn unnest_with_redundant_columns() -> Result<()> {
     let expected = vec![
         "Projection: shapes.shape_id [shape_id:UInt32]",
         "  Unnest: lists[shape_id2] structs[] [shape_id:UInt32, 
shape_id2:UInt32;N]",
-        "    Aggregate: groupBy=[[shapes.shape_id]], 
aggr=[[ARRAY_AGG(shapes.shape_id) AS shape_id2]] [shape_id:UInt32, 
shape_id2:List(Field { name: \"item\", data_type: UInt32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} });N]",
+        "    Aggregate: groupBy=[[shapes.shape_id]], 
aggr=[[array_agg(shapes.shape_id) AS shape_id2]] [shape_id:UInt32, 
shape_id2:List(Field { name: \"item\", data_type: UInt32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} });N]",
         "      TableScan: shapes projection=[shape_id] [shape_id:UInt32]",
     ];
 
@@ -1973,7 +1973,7 @@ async fn test_array_agg() -> Result<()> {
 
     let expected = [
         "+-------------------------------------+",
-        "| ARRAY_AGG(test.a)                   |",
+        "| array_agg(test.a)                   |",
         "+-------------------------------------+",
         "| [abcDEF, abc123, CBAdef, 123AbcDef] |",
         "+-------------------------------------+",
diff --git a/datafusion/core/tests/sql/aggregates.rs 
b/datafusion/core/tests/sql/aggregates.rs
index 1f4f9e77d5..1f10cb244e 100644
--- a/datafusion/core/tests/sql/aggregates.rs
+++ b/datafusion/core/tests/sql/aggregates.rs
@@ -35,7 +35,7 @@ async fn csv_query_array_agg_distinct() -> Result<()> {
     assert_eq!(
         *actual[0].schema(),
         Schema::new(vec![Field::new_list(
-            "ARRAY_AGG(DISTINCT aggregate_test_100.c2)",
+            "array_agg(DISTINCT aggregate_test_100.c2)",
             Field::new("item", DataType::UInt32, true),
             true
         ),])
diff --git a/datafusion/expr/src/aggregate_function.rs 
b/datafusion/expr/src/aggregate_function.rs
index 39b3b4ed3b..4037e3c5db 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -17,13 +17,12 @@
 
 //! Aggregate function module contains all built-in aggregate functions 
definitions
 
-use std::sync::Arc;
 use std::{fmt, str::FromStr};
 
 use crate::utils;
 use crate::{type_coercion::aggregates::*, Signature, Volatility};
 
-use arrow::datatypes::{DataType, Field};
+use arrow::datatypes::DataType;
 use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, 
Result};
 
 use strum_macros::EnumIter;
@@ -37,8 +36,6 @@ pub enum AggregateFunction {
     Min,
     /// Maximum
     Max,
-    /// Aggregation into an array
-    ArrayAgg,
 }
 
 impl AggregateFunction {
@@ -47,7 +44,6 @@ impl AggregateFunction {
         match self {
             Min => "MIN",
             Max => "MAX",
-            ArrayAgg => "ARRAY_AGG",
         }
     }
 }
@@ -65,7 +61,6 @@ impl FromStr for AggregateFunction {
             // general
             "max" => AggregateFunction::Max,
             "min" => AggregateFunction::Min,
-            "array_agg" => AggregateFunction::ArrayAgg,
             _ => {
                 return plan_err!("There is no built-in function named {name}");
             }
@@ -80,7 +75,7 @@ impl AggregateFunction {
     pub fn return_type(
         &self,
         input_expr_types: &[DataType],
-        input_expr_nullable: &[bool],
+        _input_expr_nullable: &[bool],
     ) -> Result<DataType> {
         // Note that this function *must* return the same type that the 
respective physical expression returns
         // or the execution panics.
@@ -105,11 +100,6 @@ impl AggregateFunction {
                 // The coerced_data_types is same with input_types.
                 Ok(coerced_data_types[0].clone())
             }
-            AggregateFunction::ArrayAgg => 
Ok(DataType::List(Arc::new(Field::new(
-                "item",
-                coerced_data_types[0].clone(),
-                input_expr_nullable[0],
-            )))),
         }
     }
 
@@ -118,7 +108,6 @@ impl AggregateFunction {
     pub fn nullable(&self) -> Result<bool> {
         match self {
             AggregateFunction::Max | AggregateFunction::Min => Ok(true),
-            AggregateFunction::ArrayAgg => Ok(true),
         }
     }
 }
@@ -128,7 +117,6 @@ impl AggregateFunction {
     pub fn signature(&self) -> Signature {
         // note: the physical expression must accept the type returned by this 
function or the execution panics.
         match self {
-            AggregateFunction::ArrayAgg => Signature::any(1, 
Volatility::Immutable),
             AggregateFunction::Min | AggregateFunction::Max => {
                 let valid = STRINGS
                     .iter()
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs 
b/datafusion/expr/src/type_coercion/aggregates.rs
index fbec6e2f80..a024401e18 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -95,7 +95,6 @@ pub fn coerce_types(
     check_arg_count(agg_fun.name(), input_types, &signature.type_signature)?;
 
     match agg_fun {
-        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
         AggregateFunction::Min | AggregateFunction::Max => {
             // min and max support the dictionary data type
             // unpack the dictionary to get the value
@@ -360,11 +359,7 @@ mod tests {
 
         // test count, array_agg, approx_distinct, min, max.
         // the coerced types is same with input types
-        let funs = vec![
-            AggregateFunction::ArrayAgg,
-            AggregateFunction::Min,
-            AggregateFunction::Max,
-        ];
+        let funs = vec![AggregateFunction::Min, AggregateFunction::Max];
         let input_types = vec![
             vec![DataType::Int32],
             vec![DataType::Decimal128(10, 2)],
diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs
index 1657e034fb..2851ca811e 100644
--- a/datafusion/expr/src/udaf.rs
+++ b/datafusion/expr/src/udaf.rs
@@ -542,7 +542,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
 pub enum ReversedUDAF {
     /// The expression is the same as the original expression, like SUM, COUNT
     Identical,
-    /// The expression does not support reverse calculation, like ArrayAgg
+    /// The expression does not support reverse calculation
     NotSupported,
     /// The expression is different from the original expression
     Reversed(Arc<AggregateUDF>),
diff --git a/datafusion/functions-aggregate/src/array_agg.rs 
b/datafusion/functions-aggregate/src/array_agg.rs
index 777a242aa2..96b39ae412 100644
--- a/datafusion/functions-aggregate/src/array_agg.rs
+++ b/datafusion/functions-aggregate/src/array_agg.rs
@@ -50,14 +50,12 @@ make_udaf_expr_and_func!(
 /// ARRAY_AGG aggregate expression
 pub struct ArrayAgg {
     signature: Signature,
-    alias: Vec<String>,
 }
 
 impl Default for ArrayAgg {
     fn default() -> Self {
         Self {
             signature: Signature::any(1, Volatility::Immutable),
-            alias: vec!["array_agg".to_string()],
         }
     }
 }
@@ -67,13 +65,12 @@ impl AggregateUDFImpl for ArrayAgg {
         self
     }
 
-    // TODO: change name to lowercase
     fn name(&self) -> &str {
-        "ARRAY_AGG"
+        "array_agg"
     }
 
     fn aliases(&self) -> &[String] {
-        &self.alias
+        &[]
     }
 
     fn signature(&self) -> &Signature {
diff --git a/datafusion/functions-array/src/planner.rs 
b/datafusion/functions-array/src/planner.rs
index c63c2c83e6..3f779c9f11 100644
--- a/datafusion/functions-array/src/planner.rs
+++ b/datafusion/functions-array/src/planner.rs
@@ -172,7 +172,7 @@ impl ExprPlanner for FieldAccessPlanner {
 
 fn is_array_agg(agg_func: &datafusion_expr::expr::AggregateFunction) -> bool {
     if let AggregateFunctionDefinition::UDF(udf) = &agg_func.func_def {
-        return udf.name() == "ARRAY_AGG";
+        return udf.name() == "array_agg";
     }
 
     false
diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs 
b/datafusion/physical-expr-common/src/aggregate/mod.rs
index 05c7e1caed..8c5f9f9e5a 100644
--- a/datafusion/physical-expr-common/src/aggregate/mod.rs
+++ b/datafusion/physical-expr-common/src/aggregate/mod.rs
@@ -573,8 +573,9 @@ impl AggregateExpr for AggregateFunctionExpr {
                     })
                     .collect::<Vec<_>>();
                 let mut name = self.name().to_string();
-                // TODO: Generalize order-by clause rewrite
-                if reverse_udf.name() == "ARRAY_AGG" {
+                // If the function is changed, we need to reverse order_by 
clause as well
+                // i.e. First(a order by b asc null first) -> Last(a order by 
b desc null last)
+                if self.fun().name() == reverse_udf.name() {
                 } else {
                     replace_order_by_clause(&mut name);
                 }
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs 
b/datafusion/physical-expr/src/aggregate/build_in.rs
index 27c1533d05..bdc41ff0a9 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -30,7 +30,7 @@ use std::sync::Arc;
 
 use arrow::datatypes::Schema;
 
-use datafusion_common::{internal_err, Result};
+use datafusion_common::Result;
 use datafusion_expr::AggregateFunction;
 
 use crate::expressions::{self};
@@ -56,7 +56,6 @@ pub fn create_aggregate_expr(
     let data_type = input_phy_types[0].clone();
     let input_phy_exprs = input_phy_exprs.to_vec();
     Ok(match (fun, distinct) {
-        (AggregateFunction::ArrayAgg, _) => return internal_err!("not 
reachable"),
         (AggregateFunction::Min, _) => Arc::new(expressions::Min::new(
             Arc::clone(&input_phy_exprs[0]),
             name,
@@ -123,7 +122,6 @@ mod tests {
                             result_agg_phy_exprs.field().unwrap()
                         );
                     }
-                    _ => {}
                 };
             }
         }
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 49d9f2dde6..e133abd46f 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -472,7 +472,7 @@ enum AggregateFunction {
   // AVG = 3;
   // COUNT = 4;
   // APPROX_DISTINCT = 5;
-  ARRAY_AGG = 6;
+  // ARRAY_AGG = 6;
   // VARIANCE = 7;
   // VARIANCE_POP = 8;
   // COVARIANCE = 9;
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 25f6646d2a..c5ec67d728 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -534,7 +534,6 @@ impl serde::Serialize for AggregateFunction {
         let variant = match self {
             Self::Min => "MIN",
             Self::Max => "MAX",
-            Self::ArrayAgg => "ARRAY_AGG",
         };
         serializer.serialize_str(variant)
     }
@@ -548,7 +547,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
         const FIELDS: &[&str] = &[
             "MIN",
             "MAX",
-            "ARRAY_AGG",
         ];
 
         struct GeneratedVisitor;
@@ -591,7 +589,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
                 match value {
                     "MIN" => Ok(AggregateFunction::Min),
                     "MAX" => Ok(AggregateFunction::Max),
-                    "ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg),
                     _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
                 }
             }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index ba288fe3d1..98b70dc253 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1941,12 +1941,11 @@ pub struct PartitionStats {
 #[repr(i32)]
 pub enum AggregateFunction {
     Min = 0,
-    Max = 1,
     /// SUM = 2;
     /// AVG = 3;
     /// COUNT = 4;
     /// APPROX_DISTINCT = 5;
-    ///
+    /// ARRAY_AGG = 6;
     /// VARIANCE = 7;
     /// VARIANCE_POP = 8;
     /// COVARIANCE = 9;
@@ -1975,7 +1974,7 @@ pub enum AggregateFunction {
     /// REGR_SXY = 34;
     /// STRING_AGG = 35;
     /// NTH_VALUE_AGG = 36;
-    ArrayAgg = 6,
+    Max = 1,
 }
 impl AggregateFunction {
     /// String value of the enum field names used in the ProtoBuf definition.
@@ -1986,7 +1985,6 @@ impl AggregateFunction {
         match self {
             AggregateFunction::Min => "MIN",
             AggregateFunction::Max => "MAX",
-            AggregateFunction::ArrayAgg => "ARRAY_AGG",
         }
     }
     /// Creates an enum from field names used in the ProtoBuf definition.
@@ -1994,7 +1992,6 @@ impl AggregateFunction {
         match value {
             "MIN" => Some(Self::Min),
             "MAX" => Some(Self::Max),
-            "ARRAY_AGG" => Some(Self::ArrayAgg),
             _ => None,
         }
     }
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index b6b556a8ed..aea8e454a3 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -144,7 +144,6 @@ impl From<protobuf::AggregateFunction> for 
AggregateFunction {
         match agg_fun {
             protobuf::AggregateFunction::Min => Self::Min,
             protobuf::AggregateFunction::Max => Self::Max,
-            protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg,
         }
     }
 }
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index 9607b918eb..c2441892e8 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -116,7 +116,6 @@ impl From<&AggregateFunction> for 
protobuf::AggregateFunction {
         match value {
             AggregateFunction::Min => Self::Min,
             AggregateFunction::Max => Self::Max,
-            AggregateFunction::ArrayAgg => Self::ArrayAgg,
         }
     }
 }
@@ -386,7 +385,6 @@ pub fn serialize_expr(
         }) => match func_def {
             AggregateFunctionDefinition::BuiltIn(fun) => {
                 let aggr_function = match fun {
-                    AggregateFunction::ArrayAgg => 
protobuf::AggregateFunction::ArrayAgg,
                     AggregateFunction::Min => protobuf::AggregateFunction::Min,
                     AggregateFunction::Max => protobuf::AggregateFunction::Max,
                 };
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index bb5ce1150a..fa228d499d 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -129,12 +129,12 @@ query TT
 explain select array_agg(c1 order by c2 desc, c3) from agg_order;
 ----
 logical_plan
-01)Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(agg_order.c1) ORDER BY 
[agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]]
+01)Aggregate: groupBy=[[]], aggr=[[array_agg(agg_order.c1) ORDER BY 
[agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]]
 02)--TableScan: agg_order projection=[c1, c2, c3]
 physical_plan
-01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1) ORDER BY 
[agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
+01)AggregateExec: mode=Final, gby=[], aggr=[array_agg(agg_order.c1) ORDER BY 
[agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
 02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1) 
ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
+03)----AggregateExec: mode=Partial, gby=[], aggr=[array_agg(agg_order.c1) 
ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
 04)------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST], 
preserve_partitioning=[true]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 06)----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, 
projection=[c1, c2, c3], has_header=true
@@ -231,8 +231,8 @@ explain with A as (
 ) select array_length(array_agg(distinct a.foo)), sum(distinct 1) from A a 
group by a.id;
 ----
 logical_plan
-01)Projection: array_length(ARRAY_AGG(DISTINCT a.foo)), sum(DISTINCT Int64(1))
-02)--Aggregate: groupBy=[[a.id]], aggr=[[ARRAY_AGG(DISTINCT a.foo), 
sum(DISTINCT Int64(1))]]
+01)Projection: array_length(array_agg(DISTINCT a.foo)), sum(DISTINCT Int64(1))
+02)--Aggregate: groupBy=[[a.id]], aggr=[[array_agg(DISTINCT a.foo), 
sum(DISTINCT Int64(1))]]
 03)----SubqueryAlias: a
 04)------SubqueryAlias: a
 05)--------Union
@@ -247,11 +247,11 @@ logical_plan
 14)----------Projection: Int64(1) AS id, Int64(2) AS foo
 15)------------EmptyRelation
 physical_plan
-01)ProjectionExec: expr=[array_length(ARRAY_AGG(DISTINCT a.foo)@1) as 
array_length(ARRAY_AGG(DISTINCT a.foo)), sum(DISTINCT Int64(1))@2 as 
sum(DISTINCT Int64(1))]
-02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], 
aggr=[ARRAY_AGG(DISTINCT a.foo), sum(DISTINCT Int64(1))]
+01)ProjectionExec: expr=[array_length(array_agg(DISTINCT a.foo)@1) as 
array_length(array_agg(DISTINCT a.foo)), sum(DISTINCT Int64(1))@2 as 
sum(DISTINCT Int64(1))]
+02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], 
aggr=[array_agg(DISTINCT a.foo), sum(DISTINCT Int64(1))]
 03)----CoalesceBatchesExec: target_batch_size=8192
 04)------RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=5
-05)--------AggregateExec: mode=Partial, gby=[id@0 as id], 
aggr=[ARRAY_AGG(DISTINCT a.foo), sum(DISTINCT Int64(1))], ordering_mode=Sorted
+05)--------AggregateExec: mode=Partial, gby=[id@0 as id], 
aggr=[array_agg(DISTINCT a.foo), sum(DISTINCT Int64(1))], ordering_mode=Sorted
 06)----------UnionExec
 07)------------ProjectionExec: expr=[1 as id, 2 as foo]
 08)--------------PlaceholderRowExec
diff --git a/datafusion/sqllogictest/test_files/binary_view.slt 
b/datafusion/sqllogictest/test_files/binary_view.slt
index de0f0bea7f..77ec77c5ec 100644
--- a/datafusion/sqllogictest/test_files/binary_view.slt
+++ b/datafusion/sqllogictest/test_files/binary_view.slt
@@ -199,4 +199,4 @@ Raphael R false false true true
 NULL R NULL NULL NULL NULL
 
 statement ok
-drop table test;
\ No newline at end of file
+drop table test;
diff --git a/datafusion/sqllogictest/test_files/group_by.slt 
b/datafusion/sqllogictest/test_files/group_by.slt
index b2be65a609..a3cc10e1ee 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -2289,10 +2289,10 @@ FROM annotated_data_infinite2
 GROUP BY a, b;
 ----
 logical_plan
-01)Aggregate: groupBy=[[annotated_data_infinite2.a, 
annotated_data_infinite2.b]], aggr=[[ARRAY_AGG(annotated_data_infinite2.d) 
ORDER BY [annotated_data_infinite2.d ASC NULLS LAST]]]
+01)Aggregate: groupBy=[[annotated_data_infinite2.a, 
annotated_data_infinite2.b]], aggr=[[array_agg(annotated_data_infinite2.d) 
ORDER BY [annotated_data_infinite2.d ASC NULLS LAST]]]
 02)--TableScan: annotated_data_infinite2 projection=[a, b, d]
 physical_plan
-01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[ARRAY_AGG(annotated_data_infinite2.d) ORDER BY 
[annotated_data_infinite2.d ASC NULLS LAST]], ordering_mode=Sorted
+01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[array_agg(annotated_data_infinite2.d) ORDER BY 
[annotated_data_infinite2.d ASC NULLS LAST]], ordering_mode=Sorted
 02)--PartialSortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,d@2 ASC 
NULLS LAST], common_prefix_length=[2]
 03)----StreamingTableExec: partition_sizes=1, projection=[a, b, d], 
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]
 
@@ -2459,12 +2459,12 @@ EXPLAIN SELECT country, (ARRAY_AGG(amount ORDER BY 
amount ASC)) AS amounts
   GROUP BY country
 ----
 logical_plan
-01)Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST] AS amounts
-02)--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]]
+01)Projection: sales_global.country, array_agg(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST] AS amounts
+02)--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]]
 03)----TableScan: sales_global projection=[country, amount]
 physical_plan
-01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
+01)ProjectionExec: expr=[country@0 as country, array_agg(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
 03)----SortExec: expr=[amount@1 ASC NULLS LAST], preserve_partitioning=[false]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2488,13 +2488,13 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY 
s.amount DESC) AS amounts,
         GROUP BY s.country
 ----
 logical_plan
-01)Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS 
FIRST] AS amounts, sum(s.amount) AS sum1
-02)--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]]
+01)Projection: s.country, array_agg(s.amount) ORDER BY [s.amount DESC NULLS 
FIRST] AS amounts, sum(s.amount) AS sum1
+02)--Aggregate: groupBy=[[s.country]], aggr=[[array_agg(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]]
 03)----SubqueryAlias: s
 04)------TableScan: sales_global projection=[country, amount]
 physical_plan
-01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST]@1 as amounts, sum(s.amount)@2 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], sum(s.amount)]
+01)ProjectionExec: expr=[country@0 as country, array_agg(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST]@1 as amounts, sum(s.amount)@2 as sum1]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST], sum(s.amount)]
 03)----SortExec: expr=[amount@1 DESC], preserve_partitioning=[false]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2531,14 +2531,14 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY 
s.amount DESC) AS amounts,
           GROUP BY s.country
 ----
 logical_plan
-01)Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS 
FIRST] AS amounts, sum(s.amount) AS sum1
-02)--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]]
+01)Projection: s.country, array_agg(s.amount) ORDER BY [s.amount DESC NULLS 
FIRST] AS amounts, sum(s.amount) AS sum1
+02)--Aggregate: groupBy=[[s.country]], aggr=[[array_agg(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]]
 03)----SubqueryAlias: s
 04)------Sort: sales_global.country ASC NULLS LAST
 05)--------TableScan: sales_global projection=[country, amount]
 physical_plan
-01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST]@1 as amounts, sum(s.amount)@2 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], sum(s.amount)], 
ordering_mode=Sorted
+01)ProjectionExec: expr=[country@0 as country, array_agg(s.amount) ORDER BY 
[s.amount DESC NULLS FIRST]@1 as amounts, sum(s.amount)@2 as sum1]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST], sum(s.amount)], 
ordering_mode=Sorted
 03)----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC], 
preserve_partitioning=[false]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2567,14 +2567,14 @@ EXPLAIN SELECT s.country, s.zip_code, 
ARRAY_AGG(s.amount ORDER BY s.amount DESC)
           GROUP BY s.country, s.zip_code
 ----
 logical_plan
-01)Projection: s.country, s.zip_code, ARRAY_AGG(s.amount) ORDER BY [s.amount 
DESC NULLS FIRST] AS amounts, sum(s.amount) AS sum1
-02)--Aggregate: groupBy=[[s.country, s.zip_code]], aggr=[[ARRAY_AGG(s.amount) 
ORDER BY [s.amount DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]]
+01)Projection: s.country, s.zip_code, array_agg(s.amount) ORDER BY [s.amount 
DESC NULLS FIRST] AS amounts, sum(s.amount) AS sum1
+02)--Aggregate: groupBy=[[s.country, s.zip_code]], aggr=[[array_agg(s.amount) 
ORDER BY [s.amount DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]]
 03)----SubqueryAlias: s
 04)------Sort: sales_global.country ASC NULLS LAST
 05)--------TableScan: sales_global projection=[zip_code, country, amount]
 physical_plan
-01)ProjectionExec: expr=[country@0 as country, zip_code@1 as zip_code, 
ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@2 as amounts, 
sum(s.amount)@3 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as 
zip_code], aggr=[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], 
sum(s.amount)], ordering_mode=PartiallySorted([0])
+01)ProjectionExec: expr=[country@0 as country, zip_code@1 as zip_code, 
array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@2 as amounts, 
sum(s.amount)@3 as sum1]
+02)--AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as 
zip_code], aggr=[array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST], 
sum(s.amount)], ordering_mode=PartiallySorted([0])
 03)----SortExec: expr=[country@1 ASC NULLS LAST,amount@2 DESC], 
preserve_partitioning=[false]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2603,14 +2603,14 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY 
s.country DESC) AS amounts
           GROUP BY s.country
 ----
 logical_plan
-01)Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS 
FIRST] AS amounts, sum(s.amount) AS sum1
-02)--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY 
[s.country DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]]
+01)Projection: s.country, array_agg(s.amount) ORDER BY [s.country DESC NULLS 
FIRST] AS amounts, sum(s.amount) AS sum1
+02)--Aggregate: groupBy=[[s.country]], aggr=[[array_agg(s.amount) ORDER BY 
[s.country DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]]
 03)----SubqueryAlias: s
 04)------Sort: sales_global.country ASC NULLS LAST
 05)--------TableScan: sales_global projection=[country, amount]
 physical_plan
-01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY 
[s.country DESC NULLS FIRST]@1 as amounts, sum(s.amount)@2 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST], 
sum(s.amount)], ordering_mode=Sorted
+01)ProjectionExec: expr=[country@0 as country, array_agg(s.amount) ORDER BY 
[s.country DESC NULLS FIRST]@1 as amounts, sum(s.amount)@2 as sum1]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST], 
sum(s.amount)], ordering_mode=Sorted
 03)----SortExec: expr=[country@0 ASC NULLS LAST], preserve_partitioning=[false]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2638,14 +2638,14 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY 
s.country DESC, s.amount D
           GROUP BY s.country
 ----
 logical_plan
-01)Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS 
FIRST, s.amount DESC NULLS FIRST] AS amounts, sum(s.amount) AS sum1
-02)--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY 
[s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], sum(CAST(s.amount AS 
Float64))]]
+01)Projection: s.country, array_agg(s.amount) ORDER BY [s.country DESC NULLS 
FIRST, s.amount DESC NULLS FIRST] AS amounts, sum(s.amount) AS sum1
+02)--Aggregate: groupBy=[[s.country]], aggr=[[array_agg(s.amount) ORDER BY 
[s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], sum(CAST(s.amount AS 
Float64))]]
 03)----SubqueryAlias: s
 04)------Sort: sales_global.country ASC NULLS LAST
 05)--------TableScan: sales_global projection=[country, amount]
 physical_plan
-01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY 
[s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST]@1 as amounts, 
sum(s.amount)@2 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC 
NULLS FIRST], sum(s.amount)], ordering_mode=Sorted
+01)ProjectionExec: expr=[country@0 as country, array_agg(s.amount) ORDER BY 
[s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST]@1 as amounts, 
sum(s.amount)@2 as sum1]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC 
NULLS FIRST], sum(s.amount)], ordering_mode=Sorted
 03)----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC], 
preserve_partitioning=[false]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2672,12 +2672,12 @@ EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY 
amount DESC) AS amounts,
   GROUP BY country
 ----
 logical_plan
-01)Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.amount DESC NULLS FIRST] AS amounts, 
first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] 
AS fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST] AS fv2
-02)--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC 
NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST]]]
+01)Projection: sales_global.country, array_agg(sales_global.amount) ORDER BY 
[sales_global.amount DESC NULLS FIRST] AS amounts, 
first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] 
AS fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST] AS fv2
+02)--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC 
NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST]]]
 03)----TableScan: sales_global projection=[country, amount]
 physical_plan
-01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, 
first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]]
+01)ProjectionExec: expr=[country@0 as country, array_agg(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, 
first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]]
 03)----SortExec: expr=[amount@1 DESC], preserve_partitioning=[false]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2703,12 +2703,12 @@ EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY 
amount ASC) AS amounts,
   GROUP BY country
 ----
 logical_plan
-01)Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST] AS amounts, 
first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] 
AS fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST] AS fv2
-02)--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST]]]
+01)Projection: sales_global.country, array_agg(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST] AS amounts, 
first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] 
AS fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST] AS fv2
+02)--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST]]]
 03)----TableScan: sales_global projection=[country, amount]
 physical_plan
-01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, 
first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
+01)ProjectionExec: expr=[country@0 as country, array_agg(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, 
first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
 03)----SortExec: expr=[amount@1 ASC NULLS LAST], preserve_partitioning=[false]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2735,12 +2735,12 @@ EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY 
amount ASC) AS fv1,
   GROUP BY country
 ----
 logical_plan
-01)Projection: sales_global.country, first_value(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST] AS fv1, last_value(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2, 
ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS 
amounts
-02)--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]]
+01)Projection: sales_global.country, first_value(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST] AS fv1, last_value(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2, 
array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS 
amounts
+02)--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]]
 03)----TableScan: sales_global projection=[country, amount]
 physical_plan
-01)ProjectionExec: expr=[country@0 as country, 
first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@1 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST]@3 as amounts]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
+01)ProjectionExec: expr=[country@0 as country, 
first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@1 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@2 as fv2, array_agg(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST]@3 as amounts]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST], array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
 03)----SortExec: expr=[amount@1 ASC NULLS LAST], preserve_partitioning=[false]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2765,12 +2765,12 @@ EXPLAIN SELECT country, SUM(amount ORDER BY ts DESC) AS 
sum1,
   GROUP BY country
 ----
 logical_plan
-01)Projection: sales_global.country, sum(sales_global.amount) ORDER BY 
[sales_global.ts DESC NULLS FIRST] AS sum1, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
-02)--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[sum(CAST(sales_global.amount AS Float64)) ORDER BY [sales_global.ts DESC 
NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC 
NULLS LAST]]]
+01)Projection: sales_global.country, sum(sales_global.amount) ORDER BY 
[sales_global.ts DESC NULLS FIRST] AS sum1, array_agg(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
+02)--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[sum(CAST(sales_global.amount AS Float64)) ORDER BY [sales_global.ts DESC 
NULLS FIRST], array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC 
NULLS LAST]]]
 03)----TableScan: sales_global projection=[country, ts, amount]
 physical_plan
-01)ProjectionExec: expr=[country@0 as country, sum(sales_global.amount) ORDER 
BY [sales_global.ts DESC NULLS FIRST]@1 as sum1, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@2 as amounts]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[sum(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], 
ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]
+01)ProjectionExec: expr=[country@0 as country, sum(sales_global.amount) ORDER 
BY [sales_global.ts DESC NULLS FIRST]@1 as sum1, array_agg(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@2 as amounts]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[sum(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], 
array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]
 03)----SortExec: expr=[amount@2 ASC NULLS LAST], preserve_partitioning=[false]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -3036,14 +3036,14 @@ EXPLAIN SELECT ARRAY_AGG(amount ORDER BY ts ASC) AS 
array_agg1
   FROM sales_global
 ----
 logical_plan
-01)Projection: ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC 
NULLS LAST] AS array_agg1
-02)--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.ts ASC NULLS LAST]]]
+01)Projection: array_agg(sales_global.amount) ORDER BY [sales_global.ts ASC 
NULLS LAST] AS array_agg1
+02)--Aggregate: groupBy=[[]], aggr=[[array_agg(sales_global.amount) ORDER BY 
[sales_global.ts ASC NULLS LAST]]]
 03)----TableScan: sales_global projection=[ts, amount]
 physical_plan
-01)ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.ts ASC NULLS LAST]@0 as array_agg1]
-02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.ts ASC NULLS LAST]]
+01)ProjectionExec: expr=[array_agg(sales_global.amount) ORDER BY 
[sales_global.ts ASC NULLS LAST]@0 as array_agg1]
+02)--AggregateExec: mode=Final, gby=[], aggr=[array_agg(sales_global.amount) 
ORDER BY [sales_global.ts ASC NULLS LAST]]
 03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]
+04)------AggregateExec: mode=Partial, gby=[], 
aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]
 05)--------SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true]
 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 07)------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3060,14 +3060,14 @@ EXPLAIN SELECT ARRAY_AGG(amount ORDER BY ts DESC) AS 
array_agg1
   FROM sales_global
 ----
 logical_plan
-01)Projection: ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC 
NULLS FIRST] AS array_agg1
-02)--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.ts DESC NULLS FIRST]]]
+01)Projection: array_agg(sales_global.amount) ORDER BY [sales_global.ts DESC 
NULLS FIRST] AS array_agg1
+02)--Aggregate: groupBy=[[]], aggr=[[array_agg(sales_global.amount) ORDER BY 
[sales_global.ts DESC NULLS FIRST]]]
 03)----TableScan: sales_global projection=[ts, amount]
 physical_plan
-01)ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.ts DESC NULLS FIRST]@0 as array_agg1]
-02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST]]
+01)ProjectionExec: expr=[array_agg(sales_global.amount) ORDER BY 
[sales_global.ts DESC NULLS FIRST]@0 as array_agg1]
+02)--AggregateExec: mode=Final, gby=[], aggr=[array_agg(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST]]
 03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]]
+04)------AggregateExec: mode=Partial, gby=[], 
aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]]
 05)--------SortExec: expr=[ts@0 DESC], preserve_partitioning=[true]
 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 07)------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3084,14 +3084,14 @@ EXPLAIN SELECT ARRAY_AGG(amount ORDER BY amount ASC) AS 
array_agg1
   FROM sales_global
 ----
 logical_plan
-01)Projection: ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount 
ASC NULLS LAST] AS array_agg1
-02)--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST]]]
+01)Projection: array_agg(sales_global.amount) ORDER BY [sales_global.amount 
ASC NULLS LAST] AS array_agg1
+02)--Aggregate: groupBy=[[]], aggr=[[array_agg(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST]]]
 03)----TableScan: sales_global projection=[amount]
 physical_plan
-01)ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST]@0 as array_agg1]
-02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]]
+01)ProjectionExec: expr=[array_agg(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST]@0 as array_agg1]
+02)--AggregateExec: mode=Final, gby=[], aggr=[array_agg(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]]
 03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
+04)------AggregateExec: mode=Partial, gby=[], 
aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
 05)--------SortExec: expr=[amount@0 ASC NULLS LAST], 
preserve_partitioning=[true]
 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 07)------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3111,17 +3111,17 @@ EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY 
amount ASC) AS array_agg1
 ----
 logical_plan
 01)Sort: sales_global.country ASC NULLS LAST
-02)--Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST] AS array_agg1
-03)----Aggregate: groupBy=[[sales_global.country]], 
aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]]
+02)--Projection: sales_global.country, array_agg(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST] AS array_agg1
+03)----Aggregate: groupBy=[[sales_global.country]], 
aggr=[[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]]
 04)------TableScan: sales_global projection=[country, amount]
 physical_plan
 01)SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 02)--SortExec: expr=[country@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----ProjectionExec: expr=[country@0 as country, 
ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 
as array_agg1]
-04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
+03)----ProjectionExec: expr=[country@0 as country, 
array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 
as array_agg1]
+04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
 05)--------CoalesceBatchesExec: target_batch_size=4
 06)----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
-07)------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
+07)------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]]
 08)--------------SortExec: expr=[amount@1 ASC NULLS LAST], 
preserve_partitioning=[true]
 09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 10)------------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3147,17 +3147,17 @@ EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY 
amount DESC) AS amounts,
 ----
 logical_plan
 01)Sort: sales_global.country ASC NULLS LAST
-02)--Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.amount DESC NULLS FIRST] AS amounts, 
first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] 
AS fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST] AS fv2
-03)----Aggregate: groupBy=[[sales_global.country]], 
aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC 
NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST]]]
+02)--Projection: sales_global.country, array_agg(sales_global.amount) ORDER BY 
[sales_global.amount DESC NULLS FIRST] AS amounts, 
first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] 
AS fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST] AS fv2
+03)----Aggregate: groupBy=[[sales_global.country]], 
aggr=[[array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC 
NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST]]]
 04)------TableScan: sales_global projection=[country, amount]
 physical_plan
 01)SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 02)--SortExec: expr=[country@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----ProjectionExec: expr=[country@0 as country, 
ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST]@1 as amounts, first_value(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST]@2 as fv1, last_value(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
-04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC 
NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST]]
+03)----ProjectionExec: expr=[country@0 as country, 
array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST]@1 as amounts, first_value(sales_global.amount) ORDER BY 
[sales_global.amount ASC NULLS LAST]@2 as fv1, last_value(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
+04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC 
NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST]]
 05)--------CoalesceBatchesExec: target_batch_size=4
 06)----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
-07)------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]]
+07)------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]]
 08)--------------SortExec: expr=[amount@1 DESC], preserve_partitioning=[true]
 09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 10)------------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -4971,10 +4971,10 @@ ORDER BY a, b;
 ----
 logical_plan
 01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC 
NULLS LAST
-02)--Aggregate: groupBy=[[multiple_ordered_table.a, 
multiple_ordered_table.b]], aggr=[[ARRAY_AGG(multiple_ordered_table.c) ORDER BY 
[multiple_ordered_table.c DESC NULLS FIRST]]]
+02)--Aggregate: groupBy=[[multiple_ordered_table.a, 
multiple_ordered_table.b]], aggr=[[array_agg(multiple_ordered_table.c) ORDER BY 
[multiple_ordered_table.c DESC NULLS FIRST]]]
 03)----TableScan: multiple_ordered_table projection=[a, b, c]
 physical_plan
-01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[ARRAY_AGG(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c 
DESC NULLS FIRST]], ordering_mode=Sorted
+01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], 
aggr=[array_agg(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c 
DESC NULLS FIRST]], ordering_mode=Sorted
 02)--CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS 
LAST]], has_header=true
 
 query II?
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 37214e11ea..e9d417c93a 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -2040,16 +2040,16 @@ query TT
 EXPLAIN SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM 
aggregate_test_100 ORDER BY c13 LIMIT 1)
 ----
 logical_plan
-01)Projection: ARRAY_AGG(aggregate_test_100.c13) AS array_agg1
-02)--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(aggregate_test_100.c13)]]
+01)Projection: array_agg(aggregate_test_100.c13) AS array_agg1
+02)--Aggregate: groupBy=[[]], aggr=[[array_agg(aggregate_test_100.c13)]]
 03)----Limit: skip=0, fetch=1
 04)------Sort: aggregate_test_100.c13 ASC NULLS LAST, fetch=1
 05)--------TableScan: aggregate_test_100 projection=[c13]
 physical_plan
-01)ProjectionExec: expr=[ARRAY_AGG(aggregate_test_100.c13)@0 as array_agg1]
-02)--AggregateExec: mode=Final, gby=[], 
aggr=[ARRAY_AGG(aggregate_test_100.c13)]
+01)ProjectionExec: expr=[array_agg(aggregate_test_100.c13)@0 as array_agg1]
+02)--AggregateExec: mode=Final, gby=[], 
aggr=[array_agg(aggregate_test_100.c13)]
 03)----CoalescePartitionsExec
-04)------AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAY_AGG(aggregate_test_100.c13)]
+04)------AggregateExec: mode=Partial, gby=[], 
aggr=[array_agg(aggregate_test_100.c13)]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 06)----------GlobalLimitExec: skip=0, fetch=1
 07)------------SortExec: TopK(fetch=1), expr=[c13@0 ASC NULLS LAST], 
preserve_partitioning=[false]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to