Jefffrey commented on code in PR #17367:
URL: https://github.com/apache/datafusion/pull/17367#discussion_r2366180090
##########
datafusion/sql/src/unparser/dialect.rs:
##########
@@ -267,6 +274,14 @@ impl Dialect for DefaultDialect {
pub struct PostgreSqlDialect {}
impl Dialect for PostgreSqlDialect {
+ fn supports_qualify(&self) -> bool {
+ false
+ }
+
+ fn requires_derived_table_alias(&self) -> bool {
+ true
+ }
Review Comment:
What's the relation of this fix?
##########
datafusion/sql/src/unparser/rewrite.rs:
##########
@@ -100,6 +100,79 @@ fn rewrite_sort_expr_for_union(exprs: Vec<SortExpr>) ->
Result<Vec<SortExpr>> {
Ok(sort_exprs)
}
+/// Rewrite Filter plans that have a Window as their input by inserting a
SubqueryAlias.
+///
+/// When a Filter directly operates on a Window plan, it can cause issues
during SQL unparsing
+/// because window functions in a WHERE clause are not valid SQL. The solution
is to wrap
+/// the Window plan in a SubqueryAlias, effectively creating a derived table.
+///
+/// Example transformation:
+///
+/// Filter: condition
+/// Window: window_function
+/// TableScan: table
+///
+/// becomes:
+///
+/// Filter: condition
+/// SubqueryAlias: __qualify_subquery
+/// Projection: table.column1, table.column2
+/// Window: window_function
+/// TableScan: table
+///
+pub(super) fn rewrite_qualify(plan: &LogicalPlan) -> Result<LogicalPlan> {
+ let plan = plan.clone();
+
+ let transformed_plan = plan.transform_up(|plan| match plan {
+ LogicalPlan::Filter(mut filter) => {
+ // Check if the filter's input is a Window plan
+ if matches!(&*filter.input, LogicalPlan::Window(_)) {
Review Comment:
```suggestion
// Check if the filter's input is a Window plan
LogicalPlan::Filter(mut filter)
if matches!(&*filter.input, LogicalPlan::Window(_)) =>
{
```
fyi can collapse the if into a match guard
##########
datafusion/sql/src/unparser/rewrite.rs:
##########
@@ -100,6 +100,79 @@ fn rewrite_sort_expr_for_union(exprs: Vec<SortExpr>) ->
Result<Vec<SortExpr>> {
Ok(sort_exprs)
}
+/// Rewrite Filter plans that have a Window as their input by inserting a
SubqueryAlias.
+///
+/// When a Filter directly operates on a Window plan, it can cause issues
during SQL unparsing
+/// because window functions in a WHERE clause are not valid SQL. The solution
is to wrap
+/// the Window plan in a SubqueryAlias, effectively creating a derived table.
+///
+/// Example transformation:
+///
+/// Filter: condition
+/// Window: window_function
+/// TableScan: table
+///
+/// becomes:
+///
+/// Filter: condition
+/// SubqueryAlias: __qualify_subquery
+/// Projection: table.column1, table.column2
+/// Window: window_function
+/// TableScan: table
+///
+pub(super) fn rewrite_qualify(plan: &LogicalPlan) -> Result<LogicalPlan> {
+ let plan = plan.clone();
Review Comment:
```suggestion
pub(super) fn rewrite_qualify(plan: LogicalPlan) -> Result<LogicalPlan> {
```
If we clone a reference it's better to make it known in the signature that
we need ownership anyway (and do clone at callsite if required)
##########
datafusion/sql/tests/cases/plan_to_sql.rs:
##########
@@ -2521,6 +2523,69 @@ fn
test_unparse_left_semi_join_with_table_scan_projection() -> Result<()> {
Ok(())
}
+#[test]
+fn test_unparse_window() -> Result<()> {
+ // SubqueryAlias: t
+ // Projection: t.k, t.v, rank() PARTITION BY [t.k] ORDER BY [t.v ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS r
+ // Filter: rank() PARTITION BY [t.k] ORDER BY [t.v ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW = UInt64(1)
+ // WindowAggr: windowExpr=[[rank() PARTITION BY [t.k] ORDER BY [t.v
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+ // TableScan: t projection=[k, v]
+
+ let schema = Schema::new(vec![
+ Field::new("k", DataType::Int32, false),
+ Field::new("v", DataType::Int32, false),
+ ]);
+ let window_expr = Expr::WindowFunction(Box::new(WindowFunction {
+ fun: WindowFunctionDefinition::WindowUDF(rank_udwf()),
+ params: WindowFunctionParams {
+ args: vec![],
+ partition_by: vec![col("k")],
+ order_by: vec![col("v").sort(true, true)],
+ window_frame: WindowFrame::new(None),
+ null_treatment: None,
+ distinct: false,
+ filter: None,
+ },
+ }));
+ let table_scan = table_scan(Some("test"), &schema, Some(vec![0,
1]))?.build()?;
+ let plan = LogicalPlanBuilder::window_plan(table_scan, vec![window_expr])?;
+
+ let name = plan.schema().fields().last().unwrap().name().clone();
+ let plan = LogicalPlanBuilder::from(plan)
+ .filter(col(name.clone()).eq(lit(1i64)))?
+ .project(vec![col("k"), col("v"), col(name)])?
+ .build()?;
+
+ let unparser = Unparser::new(&UnparserPostgreSqlDialect {});
+ let sql = unparser.plan_to_sql(&plan)?;
+ assert_snapshot!(
+ sql,
+ @r#"SELECT "test"."k", "test"."v", "rank() PARTITION BY [test.k] ORDER
BY [test.v ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING" FROM (SELECT "test"."k" AS "k", "test"."v" AS "v", rank() OVER
(PARTITION BY "test"."k" ORDER BY "test"."v" ASC NULLS FIRST ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS "rank() PARTITION BY [test.k]
ORDER BY [test.v ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING" FROM "test") AS "test" WHERE ("rank() PARTITION BY
[test.k] ORDER BY [test.v ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING" = 1)"#
Review Comment:
These column names seem really chunky 🤔
Is it possible to also use the cases provided in the original issue, e.g.
```sql
select k, v, r
from (
select *, rank() over(partition by k order by v) as r
from t
) t
where r = 1
```
and
```sql
select *, rank() over(partition by k order by v) as r
from t
qualify r = 1;
```
And see if the `where r = 1` gets preserved?
##########
datafusion/sql/src/unparser/rewrite.rs:
##########
@@ -100,6 +100,79 @@ fn rewrite_sort_expr_for_union(exprs: Vec<SortExpr>) ->
Result<Vec<SortExpr>> {
Ok(sort_exprs)
}
+/// Rewrite Filter plans that have a Window as their input by inserting a
SubqueryAlias.
+///
+/// When a Filter directly operates on a Window plan, it can cause issues
during SQL unparsing
+/// because window functions in a WHERE clause are not valid SQL. The solution
is to wrap
+/// the Window plan in a SubqueryAlias, effectively creating a derived table.
+///
+/// Example transformation:
+///
+/// Filter: condition
+/// Window: window_function
+/// TableScan: table
+///
+/// becomes:
+///
+/// Filter: condition
+/// SubqueryAlias: __qualify_subquery
+/// Projection: table.column1, table.column2
+/// Window: window_function
+/// TableScan: table
+///
+pub(super) fn rewrite_qualify(plan: &LogicalPlan) -> Result<LogicalPlan> {
+ let plan = plan.clone();
+
+ let transformed_plan = plan.transform_up(|plan| match plan {
+ LogicalPlan::Filter(mut filter) => {
+ // Check if the filter's input is a Window plan
+ if matches!(&*filter.input, LogicalPlan::Window(_)) {
+ // Create a SubqueryAlias around the Window plan
+ let qualifiers = filter
+ .input
+ .schema()
+ .iter()
+ .filter(|(q, _)| q.is_some())
+ .flat_map(|(q, _)| q)
+ .collect::<Vec<_>>();
+
+ let qualifier = if qualifiers.is_empty() {
+ "__qualify_subquery".to_string()
+ } else {
+ qualifiers[0].to_string()
+ };
Review Comment:
What's the significance of this hardcoded `"__qualify_subquery"`? Can it be
demonstrated in a test?
Btw can avoid collect to Vec by doing something like this:
```rust
let qualifier = filter
.input
.schema()
.iter()
.find_map(|(q, _)| q)
.map(|q| q.to_string())
.unwrap_or_else(|| "__qualify_subquery".to_string());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]