alamb commented on code in PR #6482:
URL: https://github.com/apache/arrow-datafusion/pull/6482#discussion_r1212262351


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -339,40 +340,96 @@ fn output_group_expr_helper(group_by: &PhysicalGroupBy) 
-> Vec<Arc<dyn PhysicalE
         .collect()
 }
 
+/// This function returns the ordering requirement of the first non-reversible
+/// order-sensitive aggregate function such as ARRAY_AGG. This requirement 
serves
+/// as the initial requirement while calculating the finest requirement among 
all
+/// aggregate functions. If this function returns `None`, it means there is no
+/// hard ordering requirement for the aggregate functions (in terms of 
direction).
+/// Then, we can generate two alternative requirements with opposite 
directions.
+fn get_init_req(
+    aggr_expr: &[Arc<dyn AggregateExpr>],
+    order_by_expr: &[Option<LexOrdering>],
+) -> Option<LexOrdering> {
+    for (aggr_expr, fn_reqs) in aggr_expr.iter().zip(order_by_expr.iter()) {
+        // If the aggregation function is a non-reversible order-sensitive 
function
+        // and there is a hard requirement, choose first such requirement:
+        if is_order_sensitive(aggr_expr)
+            && aggr_expr.reverse_expr().is_none()
+            && fn_reqs.is_some()
+        {
+            return fn_reqs.clone();
+        }
+    }
+    None
+}
+
+fn get_finer_ordering<

Review Comment:
   I wonder if putting this check in `datafusion_physical_expr::utils` would 
make it easier to discover if it is needed in the future?



##########
datafusion/physical-expr/src/sort_expr.rs:
##########
@@ -214,5 +214,8 @@ fn to_str(options: &SortOptions) -> &str {
     }
 }
 
-/// `LexOrdering` is a type alias for lexicographical ordering definition 
`Vec<PhysicalSortExpr>`
+///`LexOrdering` is a type alias for lexicographical ordering 
definition`Vec<PhysicalSortExpr>`
 pub type LexOrdering = Vec<PhysicalSortExpr>;
+
+///`LexOrderingReq` is a type alias for lexicographical ordering requirement 
definition`Vec<PhysicalSortRequirement>`

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -397,14 +454,111 @@ fn is_order_sensitive(aggr_expr: &Arc<dyn 
AggregateExpr>) -> bool {
         || aggr_expr.as_any().is::<ArrayAgg>()
 }
 
+/// Calculate the required input ordering for the [`AggregateExec`] by 
considering

Review Comment:
   I know you didn't add it in this PR, but it seems to me like 
   
   ```rust
   /// Checks whether the given aggregate expression is order-sensitive.
   /// For instance, a `SUM` aggregation doesn't depend on the order of its 
inputs.
   /// However, a `FirstAgg` depends on the input ordering (if the order 
changes,
   /// the first value in the list would change).
   fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
       aggr_expr.as_any().is::<FirstValue>()
           || aggr_expr.as_any().is::<LastValue>()
           || aggr_expr.as_any().is::<ArrayAgg>()
   }
   ```
   
   Might be better as a function of `AggregateExpr` (so that as people add new 
`AggregateExpr` they know there is some extra behavior used in the codebase) 
otherwise they will need to find this hard coded list
   
   



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2384,3 +2384,197 @@ SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country 
DESC, s.amount DESC) AS
 FRA [200.0, 50.0] 250
 GRC [80.0, 30.0] 110
 TUR [100.0, 75.0] 175
+
+# test_reverse_aggregate_expr
+# Some of the Aggregators can be reversed, by this way we can still run 
aggregators
+# that have contradictory requirements at first glance.
+query TT
+EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
+  FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+  FROM sales_global
+  GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY 
[sales_global.amount DESC NULLS FIRST] AS amounts, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] 
AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST] AS fv2
+--Aggregate: groupBy=[[sales_global.country]], 
aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC 
NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC 
NULLS FIRST]]]
+----TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
+--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]

Review Comment:
   I don't understand this plan -- if the sort is by `amount1 DESC` then isn't 
   
   I am expecting that these are the same:
   
   ```sql
     FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
     LAST_VALUE(amount ORDER BY amount DESC) AS fv2
   ```
   
   So when the orderby is rewritten to be `ORDER BY amount1 DESC` I expect to 
see that 
   
   ```sql
     LAST_VALUE(amount ORDER BY amount DESC) AS fv1, -- this got switched
     LAST_VALUE(amount ORDER BY amount DESC) AS fv2
   ```
   
   However, the query seems to get the right answer, so I must be missing 
something
   



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2384,3 +2384,197 @@ SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country 
DESC, s.amount DESC) AS
 FRA [200.0, 50.0] 250
 GRC [80.0, 30.0] 110
 TUR [100.0, 75.0] 175
+
+# test_reverse_aggregate_expr
+# Some of the Aggregators can be reversed, by this way we can still run 
aggregators

Review Comment:
   Maybe mention that "we can still run aggregators without resorting"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to