alamb commented on code in PR #6482:
URL: https://github.com/apache/arrow-datafusion/pull/6482#discussion_r1212262351
##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -339,40 +340,96 @@ fn output_group_expr_helper(group_by: &PhysicalGroupBy)
-> Vec<Arc<dyn PhysicalE
.collect()
}
+/// This function returns the ordering requirement of the first non-reversible
+/// order-sensitive aggregate function such as ARRAY_AGG. This requirement
serves
+/// as the initial requirement while calculating the finest requirement among
all
+/// aggregate functions. If this function returns `None`, it means there is no
+/// hard ordering requirement for the aggregate functions (in terms of
direction).
+/// Then, we can generate two alternative requirements with opposite
directions.
+fn get_init_req(
+ aggr_expr: &[Arc<dyn AggregateExpr>],
+ order_by_expr: &[Option<LexOrdering>],
+) -> Option<LexOrdering> {
+ for (aggr_expr, fn_reqs) in aggr_expr.iter().zip(order_by_expr.iter()) {
+ // If the aggregation function is a non-reversible order-sensitive
function
+ // and there is a hard requirement, choose first such requirement:
+ if is_order_sensitive(aggr_expr)
+ && aggr_expr.reverse_expr().is_none()
+ && fn_reqs.is_some()
+ {
+ return fn_reqs.clone();
+ }
+ }
+ None
+}
+
+fn get_finer_ordering<
Review Comment:
I wonder if putting this check in `datafusion_physical_expr::utils` would
make it easier to discover if it is needed in the future?
##########
datafusion/physical-expr/src/sort_expr.rs:
##########
@@ -214,5 +214,8 @@ fn to_str(options: &SortOptions) -> &str {
}
}
-/// `LexOrdering` is a type alias for lexicographical ordering definition
`Vec<PhysicalSortExpr>`
+///`LexOrdering` is a type alias for lexicographical ordering
definition`Vec<PhysicalSortExpr>`
pub type LexOrdering = Vec<PhysicalSortExpr>;
+
+///`LexOrderingReq` is a type alias for lexicographical ordering requirement
definition`Vec<PhysicalSortRequirement>`
Review Comment:
👍
##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -397,14 +454,111 @@ fn is_order_sensitive(aggr_expr: &Arc<dyn
AggregateExpr>) -> bool {
|| aggr_expr.as_any().is::<ArrayAgg>()
}
+/// Calculate the required input ordering for the [`AggregateExec`] by
considering
Review Comment:
I know you didn't add it in this PR, but it seems to me like
```rust
/// Checks whether the given aggregate expression is order-sensitive.
/// For instance, a `SUM` aggregation doesn't depend on the order of its
inputs.
/// However, a `FirstAgg` depends on the input ordering (if the order
changes,
/// the first value in the list would change).
fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
aggr_expr.as_any().is::<FirstValue>()
|| aggr_expr.as_any().is::<LastValue>()
|| aggr_expr.as_any().is::<ArrayAgg>()
}
```
Might be better as a function of `AggregateExpr` (so that as people add new
`AggregateExpr` they know there is some extra behavior used in the codebase)
otherwise they will need to find this hard coded list
##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2384,3 +2384,197 @@ SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country
DESC, s.amount DESC) AS
FRA [200.0, 50.0] 250
GRC [80.0, 30.0] 110
TUR [100.0, 75.0] 175
+
+# test_reverse_aggregate_expr
+# Some of the Aggregators can be reversed, by this way we can still run
aggregators
+# that have contradictory requirements at first glance.
+query TT
+EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
+ FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+ FROM sales_global
+ GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY
[sales_global.amount DESC NULLS FIRST] AS amounts,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]
AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC
NULLS FIRST] AS fv2
+--Aggregate: groupBy=[[sales_global.country]],
aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS
FIRST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC
NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC
NULLS FIRST]]]
+----TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount
DESC NULLS FIRST]@3 as fv2]
+--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
Review Comment:
I don't understand this plan -- if the sort is by `amount1 DESC` then isn't
I am expecting that these are the same:
```sql
FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
LAST_VALUE(amount ORDER BY amount DESC) AS fv2
```
So when the orderby is rewritten to be `ORDER BY amount1 DESC` I expect to
see that
```sql
LAST_VALUE(amount ORDER BY amount DESC) AS fv1, -- this got switched
LAST_VALUE(amount ORDER BY amount DESC) AS fv2
```
However, the query seems to get the right answer, so I must be missing
something
##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2384,3 +2384,197 @@ SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country
DESC, s.amount DESC) AS
FRA [200.0, 50.0] 250
GRC [80.0, 30.0] 110
TUR [100.0, 75.0] 175
+
+# test_reverse_aggregate_expr
+# Some of the Aggregators can be reversed, by this way we can still run
aggregators
Review Comment:
Maybe mention that "we can still run aggregators without resorting"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]