This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f52c56b1c7 Support unparsing implicit lateral `UNNEST` plan to SQL
text (#13824)
f52c56b1c7 is described below
commit f52c56b1c7fd251fd360cb3cefa21d3803bdd733
Author: Jax Liu <[email protected]>
AuthorDate: Wed Dec 25 20:16:47 2024 +0800
Support unparsing implicit lateral `UNNEST` plan to SQL text (#13824)
* support unparsing the implicit lateral unnest plan
* cargo clippy and fmt
* refactor for `check_unnest_placeholder_with_outer_ref`
* add const for the prefix string of unnest and outer refernece column
---
datafusion/expr/src/expr.rs | 9 +++-
datafusion/sql/src/unparser/plan.rs | 83 ++++++++++++++++++++++++++-----
datafusion/sql/src/unparser/rewrite.rs | 58 +++++++++++++++++++--
datafusion/sql/src/unparser/utils.rs | 25 ++++++++++
datafusion/sql/tests/cases/plan_to_sql.rs | 24 +++++++++
5 files changed, 181 insertions(+), 18 deletions(-)
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 79e846e7af..b8e495ee7a 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -2536,6 +2536,9 @@ pub fn schema_name_from_sorts(sorts: &[Sort]) ->
Result<String, fmt::Error> {
Ok(s)
}
+pub const OUTER_REFERENCE_COLUMN_PREFIX: &str = "outer_ref";
+pub const UNNEST_COLUMN_PREFIX: &str = "UNNEST";
+
/// Format expressions for display as part of a logical plan. In many cases,
this will produce
/// similar output to `Expr.name()` except that column names will be prefixed
with '#'.
impl Display for Expr {
@@ -2543,7 +2546,9 @@ impl Display for Expr {
match self {
Expr::Alias(Alias { expr, name, .. }) => write!(f, "{expr} AS
{name}"),
Expr::Column(c) => write!(f, "{c}"),
- Expr::OuterReferenceColumn(_, c) => write!(f, "outer_ref({c})"),
+ Expr::OuterReferenceColumn(_, c) => {
+ write!(f, "{OUTER_REFERENCE_COLUMN_PREFIX}({c})")
+ }
Expr::ScalarVariable(_, var_names) => write!(f, "{}",
var_names.join(".")),
Expr::Literal(v) => write!(f, "{v:?}"),
Expr::Case(case) => {
@@ -2736,7 +2741,7 @@ impl Display for Expr {
},
Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
Expr::Unnest(Unnest { expr }) => {
- write!(f, "UNNEST({expr})")
+ write!(f, "{UNNEST_COLUMN_PREFIX}({expr})")
}
}
}
diff --git a/datafusion/sql/src/unparser/plan.rs
b/datafusion/sql/src/unparser/plan.rs
index f2d46a9f4c..2574ae5d52 100644
--- a/datafusion/sql/src/unparser/plan.rs
+++ b/datafusion/sql/src/unparser/plan.rs
@@ -33,13 +33,14 @@ use super::{
Unparser,
};
use crate::unparser::ast::UnnestRelationBuilder;
-use crate::unparser::utils::unproject_agg_exprs;
+use crate::unparser::utils::{find_unnest_node_until_relation,
unproject_agg_exprs};
use crate::utils::UNNEST_PLACEHOLDER;
use datafusion_common::{
internal_err, not_impl_err,
tree_node::{TransformedResult, TreeNode},
Column, DataFusionError, Result, TableReference,
};
+use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
use datafusion_expr::{
expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType,
LogicalPlan,
LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
@@ -235,9 +236,10 @@ impl Unparser<'_> {
plan: &LogicalPlan,
relation: &mut RelationBuilder,
alias: Option<ast::TableAlias>,
+ lateral: bool,
) -> Result<()> {
let mut derived_builder = DerivedRelationBuilder::default();
- derived_builder.lateral(false).alias(alias).subquery({
+ derived_builder.lateral(lateral).alias(alias).subquery({
let inner_statement = self.plan_to_sql(plan)?;
if let ast::Statement::Query(inner_query) = inner_statement {
inner_query
@@ -257,15 +259,17 @@ impl Unparser<'_> {
alias: &str,
plan: &LogicalPlan,
relation: &mut RelationBuilder,
+ lateral: bool,
) -> Result<()> {
if self.dialect.requires_derived_table_alias() {
self.derive(
plan,
relation,
Some(self.new_table_alias(alias.to_string(), vec![])),
+ lateral,
)
} else {
- self.derive(plan, relation, None)
+ self.derive(plan, relation, None, lateral)
}
}
@@ -317,10 +321,12 @@ impl Unparser<'_> {
// Projection can be top-level plan for unnest relation
// The projection generated by the `RecursiveUnnestRewriter`
from a UNNEST relation will have
// only one expression, which is the placeholder column
generated by the rewriter.
- if self.dialect.unnest_as_table_factor()
- && p.expr.len() == 1
- && Self::is_unnest_placeholder(&p.expr[0])
- {
+ let unnest_input_type = if p.expr.len() == 1 {
+ Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
+ } else {
+ None
+ };
+ if self.dialect.unnest_as_table_factor() &&
unnest_input_type.is_some() {
if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
return self
.unnest_to_table_factor_sql(unnest, query, select,
relation);
@@ -333,6 +339,9 @@ impl Unparser<'_> {
"derived_projection",
plan,
relation,
+ unnest_input_type
+ .filter(|t| matches!(t,
UnnestInputType::OuterReference))
+ .is_some(),
);
}
self.reconstruct_select_statement(plan, p, select)?;
@@ -365,6 +374,7 @@ impl Unparser<'_> {
"derived_limit",
plan,
relation,
+ false,
);
}
if let Some(fetch) = &limit.fetch {
@@ -402,6 +412,7 @@ impl Unparser<'_> {
"derived_sort",
plan,
relation,
+ false,
);
}
let Some(query_ref) = query else {
@@ -472,6 +483,7 @@ impl Unparser<'_> {
"derived_distinct",
plan,
relation,
+ false,
);
}
let (select_distinct, input) = match distinct {
@@ -658,6 +670,7 @@ impl Unparser<'_> {
"derived_union",
plan,
relation,
+ false,
);
}
@@ -723,19 +736,54 @@ impl Unparser<'_> {
internal_err!("Unnest input is not a Projection:
{unnest:?}")
}
}
- _ => not_impl_err!("Unsupported operator: {plan:?}"),
+ LogicalPlan::Subquery(subquery)
+ if find_unnest_node_until_relation(subquery.subquery.as_ref())
+ .is_some() =>
+ {
+ if self.dialect.unnest_as_table_factor() {
+ self.select_to_sql_recursively(
+ subquery.subquery.as_ref(),
+ query,
+ select,
+ relation,
+ )
+ } else {
+ self.derive_with_dialect_alias(
+ "derived_unnest",
+ subquery.subquery.as_ref(),
+ relation,
+ true,
+ )
+ }
+ }
+ _ => {
+ not_impl_err!("Unsupported operator: {plan:?}")
+ }
}
}
- /// Try to find the placeholder column name generated by
`RecursiveUnnestRewriter`
- /// Only match the pattern
`Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`
- fn is_unnest_placeholder(expr: &Expr) -> bool {
+ /// Try to find the placeholder column name generated by
`RecursiveUnnestRewriter`.
+ ///
+ /// - If the column is a placeholder column match the pattern
`Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`,
+ /// it means it is a scalar column, return [UnnestInputType::Scalar].
+ /// - If the column is a placeholder column match the pattern
`Expr::Alias(Expr::Column("__unnest_placeholder(outer_ref(...)))")`,
+ /// it means it is an outer reference column, return
[UnnestInputType::OuterReference].
+ /// - If the column is not a placeholder column, return [None].
+ ///
+ /// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
+ fn check_unnest_placeholder_with_outer_ref(expr: &Expr) ->
Option<UnnestInputType> {
if let Expr::Alias(Alias { expr, .. }) = expr {
if let Expr::Column(Column { name, .. }) = expr.as_ref() {
- return name.starts_with(UNNEST_PLACEHOLDER);
+ if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
+ if prefix.starts_with(&format!("({}(",
OUTER_REFERENCE_COLUMN_PREFIX))
+ {
+ return Some(UnnestInputType::OuterReference);
+ }
+ return Some(UnnestInputType::Scalar);
+ }
}
}
- false
+ None
}
fn unnest_to_table_factor_sql(
@@ -1092,3 +1140,12 @@ impl From<BuilderError> for DataFusionError {
DataFusionError::External(Box::new(e))
}
}
+
+/// The type of the input to the UNNEST table factor.
+#[derive(Debug)]
+enum UnnestInputType {
+ /// The input is a column reference. It will be presented like
`outer_ref(column_name)`.
+ OuterReference,
+ /// The input is a scalar value. It will be presented like a scalar array
or struct.
+ Scalar,
+}
diff --git a/datafusion/sql/src/unparser/rewrite.rs
b/datafusion/sql/src/unparser/rewrite.rs
index 68af121a41..db98374831 100644
--- a/datafusion/sql/src/unparser/rewrite.rs
+++ b/datafusion/sql/src/unparser/rewrite.rs
@@ -23,7 +23,7 @@ use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter},
Column, HashMap, Result, TableReference,
};
-use datafusion_expr::expr::Alias;
+use datafusion_expr::expr::{Alias, UNNEST_COLUMN_PREFIX};
use datafusion_expr::{Expr, LogicalPlan, Projection, Sort, SortExpr};
use sqlparser::ast::Ident;
@@ -190,10 +190,11 @@ pub(super) fn
rewrite_plan_for_sort_on_non_projected_fields(
}
}
-/// This logic is to work out the columns and inner query for SubqueryAlias
plan for both types of
-/// subquery
+/// This logic is to work out the columns and inner query for SubqueryAlias
plan for some types of
+/// subquery or unnest
/// - `(SELECT column_a as a from table) AS A`
/// - `(SELECT column_a from table) AS A (a)`
+/// - `SELECT * FROM t1 CROSS JOIN UNNEST(t1.c1) AS u(c1)` (see
[find_unnest_column_alias])
///
/// A roundtrip example for table alias with columns
///
@@ -222,6 +223,15 @@ pub(super) fn subquery_alias_inner_query_and_columns(
) -> (&LogicalPlan, Vec<Ident>) {
let plan: &LogicalPlan = subquery_alias.input.as_ref();
+ if let LogicalPlan::Subquery(subquery) = plan {
+ let (inner_projection, Some(column)) =
+ find_unnest_column_alias(subquery.subquery.as_ref())
+ else {
+ return (plan, vec![]);
+ };
+ return (inner_projection, vec![Ident::new(column)]);
+ }
+
let LogicalPlan::Projection(outer_projections) = plan else {
return (plan, vec![]);
};
@@ -257,6 +267,48 @@ pub(super) fn subquery_alias_inner_query_and_columns(
(outer_projections.input.as_ref(), columns)
}
+/// Try to find the column alias for UNNEST in the inner projection.
+/// For example:
+/// ```sql
+/// SELECT * FROM t1 CROSS JOIN UNNEST(t1.c1) AS u(c1)
+/// ```
+/// The above query will be parsed into the following plan:
+/// ```text
+/// Projection: *
+/// Cross Join:
+/// SubqueryAlias: t1
+/// TableScan: t
+/// SubqueryAlias: u
+/// Subquery:
+/// Projection: UNNEST(outer_ref(t1.c1)) AS c1
+/// Projection: __unnest_placeholder(outer_ref(t1.c1),depth=1) AS
UNNEST(outer_ref(t1.c1))
+/// Unnest: lists[__unnest_placeholder(outer_ref(t1.c1))|depth=1]
structs[]
+/// Projection: outer_ref(t1.c1) AS
__unnest_placeholder(outer_ref(t1.c1))
+/// EmptyRelation
+/// ```
+/// The function will return the inner projection and the column alias `c1` if
the column name
+/// starts with `UNNEST(` (the `Display` result of [Expr::Unnest]) in the
inner projection.
+pub(super) fn find_unnest_column_alias(
+ plan: &LogicalPlan,
+) -> (&LogicalPlan, Option<String>) {
+ if let LogicalPlan::Projection(projection) = plan {
+ if projection.expr.len() != 1 {
+ return (plan, None);
+ }
+ if let Some(Expr::Alias(alias)) = projection.expr.first() {
+ if alias
+ .expr
+ .schema_name()
+ .to_string()
+ .starts_with(&format!("{UNNEST_COLUMN_PREFIX}("))
+ {
+ return (projection.input.as_ref(), Some(alias.name.clone()));
+ }
+ }
+ }
+ (plan, None)
+}
+
/// Injects column aliases into a subquery's logical plan. The function
searches for a `Projection`
/// within the given plan, which may be wrapped by other operators (e.g.,
LIMIT, SORT).
/// If the top-level plan is a `Projection`, it directly injects the column
aliases.
diff --git a/datafusion/sql/src/unparser/utils.rs
b/datafusion/sql/src/unparser/utils.rs
index 3a7fa5ddca..f21fb2fcb4 100644
--- a/datafusion/sql/src/unparser/utils.rs
+++ b/datafusion/sql/src/unparser/utils.rs
@@ -89,6 +89,31 @@ pub(crate) fn find_unnest_node_within_select(plan:
&LogicalPlan) -> Option<&Unne
}
}
+/// Recursively searches children of [LogicalPlan] to find Unnest node if exist
+/// until encountering a Relation node with single input
+pub(crate) fn find_unnest_node_until_relation(plan: &LogicalPlan) ->
Option<&Unnest> {
+ // Note that none of the nodes that have a corresponding node can have more
+ // than 1 input node. E.g. Projection / Filter always have 1 input node.
+ let input = plan.inputs();
+ let input = if input.len() > 1 {
+ return None;
+ } else {
+ input.first()?
+ };
+
+ if let LogicalPlan::Unnest(unnest) = input {
+ Some(unnest)
+ } else if let LogicalPlan::TableScan(_) = input {
+ None
+ } else if let LogicalPlan::Subquery(_) = input {
+ None
+ } else if let LogicalPlan::SubqueryAlias(_) = input {
+ None
+ } else {
+ find_unnest_node_within_select(input)
+ }
+}
+
/// Recursively searches children of [LogicalPlan] to find Window nodes if
exist
/// prior to encountering a Join, TableScan, or a nested subquery (derived
table factor).
/// If Window node is not found prior to this or at all before reaching the end
diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs
b/datafusion/sql/tests/cases/plan_to_sql.rs
index 236b59432a..2905ba104c 100644
--- a/datafusion/sql/tests/cases/plan_to_sql.rs
+++ b/datafusion/sql/tests/cases/plan_to_sql.rs
@@ -615,6 +615,30 @@ fn roundtrip_statement_with_dialect() -> Result<()> {
parser_dialect: Box::new(GenericDialect {}),
unparser_dialect:
Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
},
+ TestStatementWithDialect {
+ sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)",
+ expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN
UNNEST(u.array_col)"#,
+ parser_dialect: Box::new(GenericDialect {}),
+ unparser_dialect:
Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
+ },
+ TestStatementWithDialect {
+ sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col) AS t1
(c1)",
+ expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN
UNNEST(u.array_col) AS t1 (c1)"#,
+ parser_dialect: Box::new(GenericDialect {}),
+ unparser_dialect:
Box::new(CustomDialectBuilder::default().with_unnest_as_table_factor(true).build()),
+ },
+ TestStatementWithDialect {
+ sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col)",
+ expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN LATERAL
(SELECT UNNEST(u.array_col) AS "UNNEST(outer_ref(u.array_col))")"#,
+ parser_dialect: Box::new(GenericDialect {}),
+ unparser_dialect: Box::new(UnparserDefaultDialect {}),
+ },
+ TestStatementWithDialect {
+ sql: "SELECT * FROM unnest_table u, UNNEST(u.array_col) AS t1
(c1)",
+ expected: r#"SELECT * FROM unnest_table AS u CROSS JOIN LATERAL
(SELECT UNNEST(u.array_col) AS "UNNEST(outer_ref(u.array_col))") AS t1 (c1)"#,
+ parser_dialect: Box::new(GenericDialect {}),
+ unparser_dialect: Box::new(UnparserDefaultDialect {}),
+ },
];
for query in tests {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]