alamb commented on code in PR #10429:
URL: https://github.com/apache/datafusion/pull/10429#discussion_r1608917787


##########
datafusion/sqllogictest/test_files/unnest.slt:
##########
@@ -288,6 +308,18 @@ select unnest(array_remove(column1, 12)) from unnest_table;
 5
 6
 
+## unnest struct-typed column and list-typed column at the same time
+query I?II?
+select unnest(column1), column1, unnest(column5), column5 from unnest_table;
+----
+1 [1, 2, 3] 1 2 {c0: 1, c1: 2}
+2 [1, 2, 3] 1 2 {c0: 1, c1: 2}
+3 [1, 2, 3] 1 2 {c0: 1, c1: 2}
+4 [4, 5] 3 4 {c0: 3, c1: 4}
+5 [4, 5] 3 4 {c0: 3, c1: 4}
+6 [6] NULL NULL NULL
+12 [12] 7 8 {c0: 7, c1: 8}
+

Review Comment:
   Can we please add some other tests:
   
   1. Test the output type (e.g. `select arrow_typeof(unnest(column1))`)
   2. Test nested structs (I tested it works well, but I think we should have 
some coverage)
   
   
   Nested structs
   ```sql
   > create or replace table t as values (struct('a', 'b', struct('c'))), 
(struct('d', 'e', struct('f')));
   0 row(s) fetched.
   Elapsed 0.031 seconds.
   
   > select * from t;
   +-----------------------------+
   | column1                     |
   +-----------------------------+
   | {c0: a, c1: b, c2: {c0: c}} |
   | {c0: d, c1: e, c2: {c0: f}} |
   +-----------------------------+
   2 row(s) fetched.
   Elapsed 0.006 seconds.
   
   > select unnest(column1) from t;
   +----------------------+----------------------+----------------------+
   | unnest(t.column1).c0 | unnest(t.column1).c1 | unnest(t.column1).c2 |
   +----------------------+----------------------+----------------------+
   | a                    | b                    | {c0: c}              |
   | d                    | e                    | {c0: f}              |
   +----------------------+----------------------+----------------------+
   2 row(s) fetched.
   Elapsed 0.013 seconds.
   ```
   
   And 
   
   ```sql
   > create or replace table t as values (struct('a', 'b', [1,2,3])), 
(struct('x', 'y', [10,20]));
   0 row(s) fetched.
   Elapsed 0.010 seconds.
   
   > select unnest(column1) from t;
   +----------------------+----------------------+----------------------+
   | unnest(t.column1).c0 | unnest(t.column1).c1 | unnest(t.column1).c2 |
   +----------------------+----------------------+----------------------+
   | a                    | b                    | [1, 2, 3]            |
   | x                    | y                    | [10, 20]             |
   +----------------------+----------------------+----------------------+
   2 row(s) fetched.
   Elapsed 0.008 seconds.
   ```



##########
datafusion/expr/src/expr_schema.rs:
##########
@@ -123,7 +123,8 @@ impl ExprSchemable for Expr {
                         Ok(field.data_type().clone())
                     }
                     DataType::Struct(_) => {
-                        not_impl_err!("unnest() does not support struct yet")
+                        // TODO: this is not correct, because unnest(struct) 
wll result into multiple data_type

Review Comment:
   When do we end up with an `unest` **Expr**  (vs an `LogicalPlan::Unnest`). 
   
   Could we simply return `not_yet_implemented` error here?



##########
datafusion/sqllogictest/test_files/unnest.slt:
##########
@@ -288,6 +308,18 @@ select unnest(array_remove(column1, 12)) from unnest_table;
 5
 6
 
+## unnest struct-typed column and list-typed column at the same time
+query I?II?
+select unnest(column1), column1, unnest(column5), column5 from unnest_table;
+----
+1 [1, 2, 3] 1 2 {c0: 1, c1: 2}
+2 [1, 2, 3] 1 2 {c0: 1, c1: 2}
+3 [1, 2, 3] 1 2 {c0: 1, c1: 2}
+4 [4, 5] 3 4 {c0: 3, c1: 4}
+5 [4, 5] 3 4 {c0: 3, c1: 4}
+6 [6] NULL NULL NULL
+12 [12] 7 8 {c0: 7, c1: 8}
+

Review Comment:
   Interestingly the output type doesn't seem to work (which probably makes 
sense as unnest needs to be converted to a LogicalPlan). I don't think we have 
to fix this in the PR (but adding coverage would be good)
   
   ```sql
   > select unnest(column1) from t;
   +----------------------+----------------------+----------------------+
   | unnest(t.column1).c0 | unnest(t.column1).c1 | unnest(t.column1).c2 |
   +----------------------+----------------------+----------------------+
   | a                    | b                    | [1, 2, 3]            |
   | x                    | y                    | [10, 20]             |
   +----------------------+----------------------+----------------------+
   2 row(s) fetched.
   
   > select arrow_typeof(unnest(column1)) from t;
   Internal error: unnest on struct can ony be applied at the root level of 
select expression.
   This was likely caused by a bug in DataFusion's code and we would welcome 
that you file an bug report in our issue tracker
   ```



##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1592,7 +1592,47 @@ impl TableSource for LogicalTableSource {
 
 /// Create a [`LogicalPlan::Unnest`] plan
 pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> 
{
-    unnest_with_options(input, columns, UnnestOptions::new())
+    unnest_with_options(input, columns, UnnestOptions::default())
+}
+
+pub fn get_unnested_columns(

Review Comment:
   Could you please add some documentation explaining what this function does?
   
   Also, I wonder if it needs to be `pub` or if we could leave it `pub(crate)` 
or not pub?



##########
datafusion/sql/src/utils.rs:
##########
@@ -255,3 +256,188 @@ pub(crate) fn normalize_ident(id: Ident) -> String {
         None => id.value.to_ascii_lowercase(),
     }
 }
+
+/// The context is we want to rewrite unnest() into 
InnerProjection->Unnest->OuterProjection
+/// Given an expression which contains unnest expr as one of its children,
+/// Try transform depends on unnest type
+/// - For list column: unnest(col) with type list -> unnest(col) with type 
list::item
+/// - For struct column: unnest(struct(field1, field2)) -> 
unnest(struct).field1, unnest(struct).field2
+/// The transformed exprs will be used in the outer projection
+pub(crate) fn recursive_transform_unnest(
+    input: &LogicalPlan,
+    unnest_placeholder_columns: &mut Vec<String>,
+    inner_projection_exprs: &mut Vec<Expr>,
+    original_expr: Expr,
+) -> Result<Vec<Expr>> {
+    let mut transform =
+        |unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result<Vec<Expr>> {
+            // Full context, we are trying to plan the execution as 
InnerProjection->Unnest->OuterProjection
+            // inside unnest execution, each column inside the inner projection
+            // will be transformed into new columns. Thus we need to keep 
track of these placeholding column names
+            let placeholder_name = unnest_expr.display_name()?;
+
+            unnest_placeholder_columns.push(placeholder_name.clone());
+            // Add alias for the argument expression, to avoid naming conflicts
+            // with other expressions in the select list. For example: `select 
unnest(col1), col1 from t`.
+            // this extra projection is used to unnest transforming
+            inner_projection_exprs
+                .push(expr_in_unnest.clone().alias(placeholder_name.clone()));
+            let schema = input.schema();
+
+            let (data_type, _) = 
expr_in_unnest.data_type_and_nullable(schema)?;
+
+            let outer_projection_columns =
+                get_unnested_columns(&placeholder_name, &data_type)?;
+            let expr = outer_projection_columns
+                .iter()
+                .map(|col| Expr::Column(col.0.clone()))
+                .collect::<Vec<_>>();
+            Ok(expr)
+        };
+    // expr transformed maybe either the same, or different from the originals 
exprs
+    // for example:
+    // - unnest(struct_col) will be transformed into 
unnest(struct_col).field1, unnest(struct_col).field2
+    // - unnest(array_col) will be transformed into unnest(array_col)
+    // - unnest(array_col) + 1 will be transformed into unnest(array_col) + 1
+
+    // Specifically handle root level unnest expr, this is the only place
+    // unnest on struct can be handled
+    if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr {
+        return transform(&original_expr, arg);
+    }
+    let Transformed {
+            data: transformed_expr,
+            transformed,
+            tnr: _,
+        } = original_expr.transform_up(|expr: Expr| {
+            if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
+                let (data_type, _) = 
expr.data_type_and_nullable(input.schema())?;
+                if let DataType::Struct(_) = data_type {
+                    return internal_err!("unnest on struct can ony be applied 
at the root level of select expression");
+                }
+                let transformed_exprs = transform(&expr, arg)?;
+                Ok(Transformed::yes(transformed_exprs[0].clone()))
+            } else {
+                Ok(Transformed::no(expr))
+            }
+        })?;
+
+    if !transformed {
+        if matches!(&transformed_expr, Expr::Column(_)) {
+            inner_projection_exprs.push(transformed_expr.clone());
+            Ok(vec![transformed_expr])
+        } else {
+            // We need to evaluate the expr in the inner projection,
+            // outer projection just select its name
+            let column_name = transformed_expr.display_name()?;
+            inner_projection_exprs.push(transformed_expr);
+            Ok(vec![Expr::Column(Column::from_name(column_name))])
+        }
+    } else {
+        Ok(vec![transformed_expr])
+    }
+}
+
+// write test for recursive_transform_unnest
+#[cfg(test)]
+mod tests {
+    use std::{ops::Add, sync::Arc};
+
+    use super::*;
+    use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
+    use arrow_schema::Fields;
+    use datafusion_common::DFSchema;
+    use datafusion_expr::EmptyRelation;
+
+    #[test]
+    fn test_recursive_transform_unnest() -> Result<()> {
+        let schema = Schema::new(vec![
+            Field::new(
+                "struct_col",
+                ArrowDataType::Struct(Fields::from(vec![
+                    Field::new("field1", ArrowDataType::Int32, false),
+                    Field::new("field2", ArrowDataType::Int32, false),
+                ])),
+                false,
+            ),
+            Field::new(
+                "array_col",
+                ArrowDataType::List(Arc::new(Field::new("item", 
DataType::Int64, true))),
+                true,
+            ),
+            Field::new("int_col", ArrowDataType::Int32, false),
+        ]);
+
+        let dfschema = DFSchema::try_from(schema)?;
+
+        let input = LogicalPlan::EmptyRelation(EmptyRelation {
+            produce_one_row: false,
+            schema: Arc::new(dfschema),
+        });
+
+        let mut unnest_placeholder_columns = vec![];
+        let mut inner_projection_exprs = vec![];
+
+        // unnest(struct_col)
+        let original_expr = Expr::Unnest(Unnest {
+            expr: Box::new(Expr::Column(Column::from_name("struct_col"))),

Review Comment:
   I think you can make this a lot more concise using the 
[expr_fn](https://docs.rs/datafusion/latest/datafusion/logical_expr/expr_fn/index.html)
 API, like:
   
   ```suggestion
               expr: Box::new(col("struct_col"))
   ```
   
   In fact maybe we should add an expr_fn for unnest
   
   ```rust
   /// Call `unnest` on the expression
   fn unnest(arg: Expr) -> Expr { 
     Expr::Unnest(Unnest { expr:: Box::new(arg) } )
   }
   ```
   
   
     



##########
datafusion/sql/src/utils.rs:
##########
@@ -255,3 +256,188 @@ pub(crate) fn normalize_ident(id: Ident) -> String {
         None => id.value.to_ascii_lowercase(),
     }
 }
+
+/// The context is we want to rewrite unnest() into 
InnerProjection->Unnest->OuterProjection
+/// Given an expression which contains unnest expr as one of its children,
+/// Try transform depends on unnest type
+/// - For list column: unnest(col) with type list -> unnest(col) with type 
list::item
+/// - For struct column: unnest(struct(field1, field2)) -> 
unnest(struct).field1, unnest(struct).field2
+/// The transformed exprs will be used in the outer projection
+pub(crate) fn recursive_transform_unnest(
+    input: &LogicalPlan,
+    unnest_placeholder_columns: &mut Vec<String>,
+    inner_projection_exprs: &mut Vec<Expr>,
+    original_expr: Expr,
+) -> Result<Vec<Expr>> {
+    let mut transform =
+        |unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result<Vec<Expr>> {
+            // Full context, we are trying to plan the execution as 
InnerProjection->Unnest->OuterProjection
+            // inside unnest execution, each column inside the inner projection
+            // will be transformed into new columns. Thus we need to keep 
track of these placeholding column names
+            let placeholder_name = unnest_expr.display_name()?;
+
+            unnest_placeholder_columns.push(placeholder_name.clone());
+            // Add alias for the argument expression, to avoid naming conflicts
+            // with other expressions in the select list. For example: `select 
unnest(col1), col1 from t`.
+            // this extra projection is used to unnest transforming
+            inner_projection_exprs
+                .push(expr_in_unnest.clone().alias(placeholder_name.clone()));
+            let schema = input.schema();
+
+            let (data_type, _) = 
expr_in_unnest.data_type_and_nullable(schema)?;
+
+            let outer_projection_columns =
+                get_unnested_columns(&placeholder_name, &data_type)?;
+            let expr = outer_projection_columns
+                .iter()
+                .map(|col| Expr::Column(col.0.clone()))
+                .collect::<Vec<_>>();
+            Ok(expr)
+        };
+    // expr transformed maybe either the same, or different from the originals 
exprs
+    // for example:
+    // - unnest(struct_col) will be transformed into 
unnest(struct_col).field1, unnest(struct_col).field2
+    // - unnest(array_col) will be transformed into unnest(array_col)
+    // - unnest(array_col) + 1 will be transformed into unnest(array_col) + 1
+
+    // Specifically handle root level unnest expr, this is the only place
+    // unnest on struct can be handled
+    if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr {
+        return transform(&original_expr, arg);
+    }
+    let Transformed {
+            data: transformed_expr,
+            transformed,
+            tnr: _,
+        } = original_expr.transform_up(|expr: Expr| {
+            if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
+                let (data_type, _) = 
expr.data_type_and_nullable(input.schema())?;
+                if let DataType::Struct(_) = data_type {
+                    return internal_err!("unnest on struct can ony be applied 
at the root level of select expression");
+                }
+                let transformed_exprs = transform(&expr, arg)?;
+                Ok(Transformed::yes(transformed_exprs[0].clone()))
+            } else {
+                Ok(Transformed::no(expr))
+            }
+        })?;
+
+    if !transformed {
+        if matches!(&transformed_expr, Expr::Column(_)) {
+            inner_projection_exprs.push(transformed_expr.clone());
+            Ok(vec![transformed_expr])
+        } else {
+            // We need to evaluate the expr in the inner projection,
+            // outer projection just select its name
+            let column_name = transformed_expr.display_name()?;
+            inner_projection_exprs.push(transformed_expr);
+            Ok(vec![Expr::Column(Column::from_name(column_name))])
+        }
+    } else {
+        Ok(vec![transformed_expr])
+    }
+}
+
+// write test for recursive_transform_unnest
+#[cfg(test)]
+mod tests {
+    use std::{ops::Add, sync::Arc};
+
+    use super::*;
+    use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
+    use arrow_schema::Fields;
+    use datafusion_common::DFSchema;
+    use datafusion_expr::EmptyRelation;
+
+    #[test]
+    fn test_recursive_transform_unnest() -> Result<()> {
+        let schema = Schema::new(vec![
+            Field::new(
+                "struct_col",
+                ArrowDataType::Struct(Fields::from(vec![
+                    Field::new("field1", ArrowDataType::Int32, false),
+                    Field::new("field2", ArrowDataType::Int32, false),
+                ])),
+                false,
+            ),
+            Field::new(
+                "array_col",
+                ArrowDataType::List(Arc::new(Field::new("item", 
DataType::Int64, true))),
+                true,
+            ),
+            Field::new("int_col", ArrowDataType::Int32, false),
+        ]);
+
+        let dfschema = DFSchema::try_from(schema)?;
+
+        let input = LogicalPlan::EmptyRelation(EmptyRelation {
+            produce_one_row: false,
+            schema: Arc::new(dfschema),
+        });
+
+        let mut unnest_placeholder_columns = vec![];
+        let mut inner_projection_exprs = vec![];
+
+        // unnest(struct_col)
+        let original_expr = Expr::Unnest(Unnest {
+            expr: Box::new(Expr::Column(Column::from_name("struct_col"))),
+        });
+        let transformed_exprs = recursive_transform_unnest(
+            &input,
+            &mut unnest_placeholder_columns,
+            &mut inner_projection_exprs,
+            original_expr,
+        )?;
+        assert_eq!(
+            transformed_exprs,
+            vec![
+                Expr::Column(Column::from_name("unnest(struct_col).field1")),
+                Expr::Column(Column::from_name("unnest(struct_col).field2")),
+            ]
+        );
+        assert_eq!(unnest_placeholder_columns, vec!["unnest(struct_col)"]);
+        // still reference struct_col in original schema but with alias,
+        // to avoid colliding with the projection on the column itself if any
+        assert_eq!(
+            inner_projection_exprs,
+            vec![
+                
Expr::Column(Column::from_name("struct_col")).alias("unnest(struct_col)"),
+            ]
+        );
+
+        // unnest(array_col ) + 1
+        let original_expr = Expr::Unnest(Unnest {
+            expr: Box::new(Expr::Column(Column::from_name("array_col"))),
+        })
+        .add(Expr::Literal(ScalarValue::Int64(Some(1))));

Review Comment:
   ```suggestion
           .add(lit(1i64));
   ```
   
   There are similar simplifications we could do below too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to