alamb commented on a change in pull request #1506:
URL: https://github.com/apache/arrow-datafusion/pull/1506#discussion_r777023721
##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -1317,6 +1318,79 @@ pub fn normalize_cols(
.collect()
}
+/// Rewrite sort on aggregate expressions to sort on the column of aggregate
output
+pub fn rewrite_sort_cols_by_aggs(
+ exprs: impl IntoIterator<Item = impl Into<Expr>>,
+ plan: &LogicalPlan,
+) -> Result<Vec<Expr>> {
+ exprs
+ .into_iter()
+ .map(|e| {
+ let expr = e.into();
+ match expr.clone() {
+ Expr::Sort {
+ expr,
+ asc,
+ nulls_first,
+ } => {
+ let sort = Expr::Sort {
+ expr: Box::new(rewrite_sort_col_by_aggs(*expr, plan)?),
+ asc,
+ nulls_first,
+ };
+ Ok(sort)
+ }
+ _ => Ok(expr),
Review comment:
I think you can avoid this clone via:
```suggestion
match expr {
Expr::Sort {
expr,
asc,
nulls_first,
} => {
let sort = Expr::Sort {
expr: Box::new(rewrite_sort_col_by_aggs(*expr,
plan)?),
asc,
nulls_first,
};
Ok(sort)
}
expr => Ok(expr),
```
##########
File path: datafusion/tests/sql/order.rs
##########
@@ -32,6 +32,27 @@ async fn test_sort_unprojected_col() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn test_order_by_agg_expr() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ register_aggregate_csv(&mut ctx).await?;
+ let sql = "SELECT MIN(c12) FROM aggregate_test_100 ORDER BY MIN(c12)";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+-----------------------------+",
+ "| MIN(aggregate_test_100.c12) |",
+ "+-----------------------------+",
+ "| 0.01479305307777301 |",
+ "+-----------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+
+ let sql = "SELECT MIN(c12) FROM aggregate_test_100 ORDER BY MIN(c12) +
0.1";
Review comment:
👍
##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -1317,6 +1318,79 @@ pub fn normalize_cols(
.collect()
}
+/// Rewrite sort on aggregate expressions to sort on the column of aggregate
output
+pub fn rewrite_sort_cols_by_aggs(
+ exprs: impl IntoIterator<Item = impl Into<Expr>>,
+ plan: &LogicalPlan,
+) -> Result<Vec<Expr>> {
+ exprs
+ .into_iter()
+ .map(|e| {
+ let expr = e.into();
+ match expr.clone() {
+ Expr::Sort {
+ expr,
+ asc,
+ nulls_first,
+ } => {
+ let sort = Expr::Sort {
+ expr: Box::new(rewrite_sort_col_by_aggs(*expr, plan)?),
+ asc,
+ nulls_first,
+ };
+ Ok(sort)
+ }
+ _ => Ok(expr),
+ }
+ })
+ .collect()
+}
+
+fn rewrite_sort_col_by_aggs(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
+ match plan {
+ LogicalPlan::Aggregate(Aggregate {
+ input, aggr_expr, ..
+ }) => {
+ struct Rewriter<'a> {
+ plan: &'a LogicalPlan,
+ input: &'a LogicalPlan,
+ aggr_expr: &'a Vec<Expr>,
+ }
+
+ impl<'a> ExprRewriter for Rewriter<'a> {
+ fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+ let normalized_expr = normalize_col(expr.clone(),
self.plan);
+ if normalized_expr.is_err() {
+ // The expr is not based on Aggregate plan output.
Skip it.
+ return Ok(expr);
+ }
+ let normalized_expr = normalized_expr.unwrap();
+ let found_agg =
+ self.aggr_expr.iter().find(|a| (**a) ==
normalized_expr);
+ if found_agg.is_some() {
+ let agg = normalize_col(found_agg.unwrap().clone(),
self.plan)?;
+ let col = Expr::Column(
+ agg.to_field(self.input.schema())
+ .map(|f| f.qualified_column())?,
+ );
+ Ok(col)
+ } else {
+ Ok(expr)
+ }
Review comment:
```suggestion
if let Some(found_agg) = self.aggr_expr.iter().find(|a|
(**a) == normalized_expr) {
let agg = normalize_col(found_agg, self.plan)?;
let col = Expr::Column(
agg.to_field(self.input.schema())
.map(|f| f.qualified_column())?,
);
Ok(col)
} else {
Ok(expr)
}
```
##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -1317,6 +1318,79 @@ pub fn normalize_cols(
.collect()
}
+/// Rewrite sort on aggregate expressions to sort on the column of aggregate
output
+pub fn rewrite_sort_cols_by_aggs(
+ exprs: impl IntoIterator<Item = impl Into<Expr>>,
+ plan: &LogicalPlan,
+) -> Result<Vec<Expr>> {
+ exprs
+ .into_iter()
+ .map(|e| {
+ let expr = e.into();
+ match expr.clone() {
+ Expr::Sort {
+ expr,
+ asc,
+ nulls_first,
+ } => {
+ let sort = Expr::Sort {
+ expr: Box::new(rewrite_sort_col_by_aggs(*expr, plan)?),
+ asc,
+ nulls_first,
+ };
+ Ok(sort)
+ }
+ _ => Ok(expr),
+ }
+ })
+ .collect()
+}
+
+fn rewrite_sort_col_by_aggs(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
+ match plan {
+ LogicalPlan::Aggregate(Aggregate {
+ input, aggr_expr, ..
+ }) => {
+ struct Rewriter<'a> {
+ plan: &'a LogicalPlan,
+ input: &'a LogicalPlan,
+ aggr_expr: &'a Vec<Expr>,
+ }
+
+ impl<'a> ExprRewriter for Rewriter<'a> {
+ fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+ let normalized_expr = normalize_col(expr.clone(),
self.plan);
+ if normalized_expr.is_err() {
+ // The expr is not based on Aggregate plan output.
Skip it.
+ return Ok(expr);
+ }
+ let normalized_expr = normalized_expr.unwrap();
+ let found_agg =
+ self.aggr_expr.iter().find(|a| (**a) ==
normalized_expr);
+ if found_agg.is_some() {
+ let agg = normalize_col(found_agg.unwrap().clone(),
self.plan)?;
+ let col = Expr::Column(
+ agg.to_field(self.input.schema())
+ .map(|f| f.qualified_column())?,
+ );
+ Ok(col)
+ } else {
+ Ok(expr)
+ }
Review comment:
I think you can avoid some `is_some()` and `unwrap()` calls here to make
the code stylistically nicer
##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -1317,6 +1318,79 @@ pub fn normalize_cols(
.collect()
}
+/// Rewrite sort on aggregate expressions to sort on the column of aggregate
output
Review comment:
```suggestion
/// Rewrite sort on aggregate expressions to sort on the column of aggregate
output
/// For example, `max(x)` is written to `col("MAX(x)")`
```
It took me a bit of thought to understand what this was doing so I think an
extra comment might help.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]