This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 03557131d2 feat: extend `unnest` to support Struct datatype (#10429)
03557131d2 is described below

commit 03557131d2c8f2ccc428b21681472db11237212e
Author: Duong Cong Toai <[email protected]>
AuthorDate: Wed May 22 22:53:55 2024 +0200

    feat: extend `unnest` to support Struct datatype (#10429)
    
    * feat: extend unnest for struct
    
    * compile err
    
    * debugging
    
    * finish basic
    
    * chore: complete impl
    
    * chore: clean garbage
    
    * chore: more test
    
    * test: fix df test
    
    * prettify display
    
    * fix unit test
    
    * chore: compile err
    
    * chore: fix physical exec err
    
    * add sqllogic test
    
    * chore: more doc
    
    * chore: refactor
    
    * fix doc
    
    * fmt
    
    * fix doc
    
    * ut for recursive transform unnest
    
    * a small integration test
    
    * fix comment
---
 datafusion/core/src/dataframe/mod.rs               |   6 +-
 datafusion/core/src/physical_planner.rs            |  16 +-
 datafusion/core/tests/data/unnest.json             |   2 +
 datafusion/core/tests/dataframe/mod.rs             |  12 +-
 datafusion/expr/src/expr_fn.rs                     |   9 +-
 datafusion/expr/src/expr_schema.rs                 |   2 +-
 datafusion/expr/src/logical_plan/builder.rs        | 220 +++++++++++++++------
 datafusion/expr/src/logical_plan/display.rs        |  21 +-
 datafusion/expr/src/logical_plan/plan.rs           |  46 ++++-
 datafusion/expr/src/logical_plan/tree_node.rs      |  14 +-
 .../optimizer/src/optimize_projections/mod.rs      |   9 +-
 datafusion/physical-plan/src/unnest.rs             | 183 ++++++++++++-----
 datafusion/sql/src/expr/function.rs                |   6 +-
 datafusion/sql/src/select.rs                       |  62 +++---
 datafusion/sql/src/utils.rs                        | 187 +++++++++++++++++-
 datafusion/sql/tests/sql_integration.rs            |  38 ++++
 datafusion/sqllogictest/test_files/unnest.slt      |  81 ++++++--
 17 files changed, 707 insertions(+), 207 deletions(-)

diff --git a/datafusion/core/src/dataframe/mod.rs 
b/datafusion/core/src/dataframe/mod.rs
index 04aaf5a890..d4626134ac 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -263,7 +263,7 @@ impl DataFrame {
         self.unnest_columns_with_options(&[column], options)
     }
 
-    /// Expand multiple list columns into a set of rows.
+    /// Expand multiple list/struct columns into a set of rows and new columns.
     ///
     /// See also:
     ///
@@ -277,8 +277,8 @@ impl DataFrame {
     /// # #[tokio::main]
     /// # async fn main() -> Result<()> {
     /// let ctx = SessionContext::new();
-    /// let df = ctx.read_csv("tests/data/example.csv", 
CsvReadOptions::new()).await?;
-    /// let df = df.unnest_columns(&["a", "b"])?;
+    /// let df = ctx.read_json("tests/data/unnest.json", 
NdJsonReadOptions::default()).await?;
+    /// let df = df.unnest_columns(&["b","c","d"])?;
     /// # Ok(())
     /// # }
     /// ```
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 090b1d59d9..bc5818361b 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -49,7 +49,7 @@ use crate::physical_plan::aggregates::{AggregateExec, 
AggregateMode, PhysicalGro
 use crate::physical_plan::analyze::AnalyzeExec;
 use crate::physical_plan::empty::EmptyExec;
 use crate::physical_plan::explain::ExplainExec;
-use crate::physical_plan::expressions::{Column, PhysicalSortExpr};
+use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::filter::FilterExec;
 use crate::physical_plan::joins::utils as join_utils;
 use crate::physical_plan::joins::{
@@ -1112,24 +1112,18 @@ impl DefaultPhysicalPlanner {
                 Arc::new(GlobalLimitExec::new(input, *skip, *fetch))
             }
             LogicalPlan::Unnest(Unnest {
-                columns,
+                list_type_columns,
+                struct_type_columns,
                 schema,
                 options,
                 ..
             }) => {
                 let input = children.one()?;
-                let column_execs = columns
-                    .iter()
-                    .map(|column| {
-                        schema
-                            .index_of_column(column)
-                            .map(|idx| Column::new(&column.name, idx))
-                    })
-                    .collect::<Result<_>>()?;
                 let schema = SchemaRef::new(schema.as_ref().to_owned().into());
                 Arc::new(UnnestExec::new(
                     input,
-                    column_execs,
+                    list_type_columns.clone(),
+                    struct_type_columns.clone(),
                     schema,
                     options.clone(),
                 ))
diff --git a/datafusion/core/tests/data/unnest.json 
b/datafusion/core/tests/data/unnest.json
new file mode 100644
index 0000000000..5999171c28
--- /dev/null
+++ b/datafusion/core/tests/data/unnest.json
@@ -0,0 +1,2 @@
+{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true],"d":{"e":1,"f":2}}
+{"a":2, "b":[3.0, 2.3, -7.1], "c":[false, true]}
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index 009f45b280..9b7cb85614 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -1231,11 +1231,11 @@ async fn unnest_aggregate_columns() -> Result<()> {
         .collect()
         .await?;
     let expected = [
-        r#"+--------------------+"#,
-        r#"| COUNT(shapes.tags) |"#,
-        r#"+--------------------+"#,
-        r#"| 9                  |"#,
-        r#"+--------------------+"#,
+        r#"+-------------+"#,
+        r#"| COUNT(tags) |"#,
+        r#"+-------------+"#,
+        r#"| 9           |"#,
+        r#"+-------------+"#,
     ];
     assert_batches_sorted_eq!(expected, &results);
 
@@ -1384,7 +1384,7 @@ async fn unnest_with_redundant_columns() -> Result<()> {
     let optimized_plan = df.clone().into_optimized_plan()?;
     let expected = vec![
         "Projection: shapes.shape_id [shape_id:UInt32]",
-        "  Unnest: shape_id2 [shape_id:UInt32, shape_id2:UInt32;N]",
+        "  Unnest: lists[shape_id2] structs[] [shape_id:UInt32, 
shape_id2:UInt32;N]",
         "    Aggregate: groupBy=[[shapes.shape_id]], 
aggr=[[ARRAY_AGG(shapes.shape_id) AS shape_id2]] [shape_id:UInt32, 
shape_id2:List(Field { name: \"item\", data_type: UInt32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} });N]",
         "      TableScan: shapes projection=[shape_id] [shape_id:UInt32]",
     ];
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 64763a9736..2a2bb75f18 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -19,7 +19,7 @@
 
 use crate::expr::{
     AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, 
InSubquery,
-    Placeholder, TryCast,
+    Placeholder, TryCast, Unnest,
 };
 use crate::function::{
     AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
@@ -489,6 +489,13 @@ pub fn when(when: Expr, then: Expr) -> CaseBuilder {
     CaseBuilder::new(None, vec![when], vec![then], None)
 }
 
+/// Create a Unnest expression
+pub fn unnest(expr: Expr) -> Expr {
+    Expr::Unnest(Unnest {
+        expr: Box::new(expr),
+    })
+}
+
 /// Convenience method to create a new user defined scalar function (UDF) with 
a
 /// specific signature and specific return type.
 ///
diff --git a/datafusion/expr/src/expr_schema.rs 
b/datafusion/expr/src/expr_schema.rs
index 8b7f30d245..01c9edff30 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -121,7 +121,7 @@ impl ExprSchemable for Expr {
                         Ok(field.data_type().clone())
                     }
                     DataType::Struct(_) => {
-                        not_impl_err!("unnest() does not support struct yet")
+                        Ok(arg_data_type)
                     }
                     DataType::Null => {
                         not_impl_err!("unnest() does not support null yet")
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 3b1b1196f1..8483525d7f 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -51,9 +51,9 @@ use arrow::datatypes::{DataType, Field, Fields, Schema, 
SchemaRef};
 use datafusion_common::config::FormatOptions;
 use datafusion_common::display::ToStringifiedPlan;
 use datafusion_common::{
-    get_target_functional_dependencies, not_impl_err, plan_datafusion_err, 
plan_err,
-    Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, 
TableReference,
-    ToDFSchema, UnnestOptions,
+    get_target_functional_dependencies, internal_err, not_impl_err, 
plan_datafusion_err,
+    plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, Result, 
ScalarValue,
+    TableReference, ToDFSchema, UnnestOptions,
 };
 
 /// Default table name for unnamed table
@@ -1592,7 +1592,53 @@ impl TableSource for LogicalTableSource {
 
 /// Create a [`LogicalPlan::Unnest`] plan
 pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> 
{
-    unnest_with_options(input, columns, UnnestOptions::new())
+    unnest_with_options(input, columns, UnnestOptions::default())
+}
+
+// Based on data type, either struct or a variant of list
+// return a set of columns as the result of unnesting
+// the input columns.
+// For example, given a column with name "a",
+// - List(Element) returns ["a"] with data type Element
+// - Struct(field1, field2) returns ["a.field1","a.field2"]
+pub fn get_unnested_columns(
+    col_name: &String,
+    data_type: &DataType,
+) -> Result<Vec<(Column, Arc<Field>)>> {
+    let mut qualified_columns = Vec::with_capacity(1);
+
+    match data_type {
+        DataType::List(field)
+        | DataType::FixedSizeList(field, _)
+        | DataType::LargeList(field) => {
+            let new_field = Arc::new(Field::new(
+                col_name.clone(),
+                field.data_type().clone(),
+                // Unnesting may produce NULLs even if the list is not null.
+                // For example: unnset([1], []) -> 1, null
+                true,
+            ));
+            let column = Column::from_name(col_name);
+            // let column = Column::from((None, &new_field));
+            qualified_columns.push((column, new_field));
+        }
+        DataType::Struct(fields) => {
+            qualified_columns.extend(fields.iter().map(|f| {
+                let new_name = format!("{}.{}", col_name, f.name());
+                let column = Column::from_name(&new_name);
+                let new_field = f.as_ref().clone().with_name(new_name);
+                // let column = Column::from((None, &f));
+                (column, Arc::new(new_field))
+            }))
+        }
+        _ => {
+            return internal_err!(
+                "trying to unnest on invalid data type {:?}",
+                data_type
+            );
+        }
+    };
+    Ok(qualified_columns)
 }
 
 /// Create a [`LogicalPlan::Unnest`] plan with options
@@ -1601,41 +1647,59 @@ pub fn unnest_with_options(
     columns: Vec<Column>,
     options: UnnestOptions,
 ) -> Result<LogicalPlan> {
-    // Extract the type of the nested field in the list.
-    let mut unnested_fields: HashMap<usize, _> = 
HashMap::with_capacity(columns.len());
-    // Add qualifiers to the columns.
-    let mut qualified_columns = Vec::with_capacity(columns.len());
-    for c in &columns {
-        let index = input.schema().index_of_column(c)?;
-        let (unnest_qualifier, unnest_field) = 
input.schema().qualified_field(index);
-        let unnested_field = match unnest_field.data_type() {
-            DataType::List(field)
-            | DataType::FixedSizeList(field, _)
-            | DataType::LargeList(field) => Arc::new(Field::new(
-                unnest_field.name(),
-                field.data_type().clone(),
-                // Unnesting may produce NULLs even if the list is not null.
-                // For example: unnset([1], []) -> 1, null
-                true,
-            )),
-            _ => {
-                // If the unnest field is not a list type return the input 
plan.
-                return Ok(input);
-            }
-        };
-        qualified_columns.push(Column::from((unnest_qualifier, 
&unnested_field)));
-        unnested_fields.insert(index, unnested_field);
-    }
+    let mut list_columns = Vec::with_capacity(columns.len());
+    let mut struct_columns = Vec::with_capacity(columns.len());
+    let column_by_original_index = columns
+        .iter()
+        .map(|c| Ok((input.schema().index_of_column(c)?, c)))
+        .collect::<Result<HashMap<usize, &Column>>>()?;
 
-    // Update the schema with the unnest column types changed to contain the 
nested types.
     let input_schema = input.schema();
+
+    let mut dependency_indices = vec![];
+    // Transform input schema into new schema
+    // e.g int, unnest([]int), unnest(struct(varchar,varchar))
+    // becomes int, int, varchar, varchar
     let fields = input_schema
         .iter()
         .enumerate()
-        .map(|(index, (q, f))| match unnested_fields.get(&index) {
-            Some(unnested_field) => (q.cloned(), unnested_field.clone()),
-            None => (q.cloned(), f.clone()),
+        .map(|(index, (original_qualifier, original_field))| {
+            match column_by_original_index.get(&index) {
+                Some(&column_to_unnest) => {
+                    let flatten_columns = get_unnested_columns(
+                        &column_to_unnest.name,
+                        original_field.data_type(),
+                    )?;
+                    match original_field.data_type() {
+                        DataType::List(_)
+                        | DataType::FixedSizeList(_, _)
+                        | DataType::LargeList(_) => list_columns.push(index),
+                        DataType::Struct(_) => struct_columns.push(index),
+                        _ => {
+                            panic!(
+                                "not reachable, should be caught by 
get_unnested_columns"
+                            )
+                        }
+                    }
+                    // new columns dependent on the same original index
+                    dependency_indices
+                        
.extend(std::iter::repeat(index).take(flatten_columns.len()));
+                    Ok(flatten_columns
+                        .iter()
+                        .map(|col: &(Column, Arc<Field>)| {
+                            (col.0.relation.to_owned(), col.1.to_owned())
+                        })
+                        .collect())
+                }
+                None => {
+                    dependency_indices.push(index);
+                    Ok(vec![(original_qualifier.cloned(), 
original_field.clone())])
+                }
+            }
         })
+        .collect::<Result<Vec<_>>>()?
+        .into_iter()
+        .flatten()
         .collect::<Vec<_>>();
 
     let metadata = input_schema.metadata().clone();
@@ -1643,9 +1707,13 @@ pub fn unnest_with_options(
     // We can use the existing functional dependencies:
     let deps = input_schema.functional_dependencies().clone();
     let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
+
     Ok(LogicalPlan::Unnest(Unnest {
         input: Arc::new(input),
-        columns: qualified_columns,
+        exec_columns: columns,
+        list_type_columns: list_columns,
+        struct_type_columns: struct_columns,
+        dependency_indices,
         schema,
         options,
     }))
@@ -2074,13 +2142,13 @@ mod tests {
 
     #[test]
     fn plan_builder_unnest() -> Result<()> {
-        // Unnesting a simple column should return the child plan.
-        let plan = nested_table_scan("test_table")?
-            .unnest_column("scalar")?
-            .build()?;
-
-        let expected = "TableScan: test_table";
-        assert_eq!(expected, format!("{plan:?}"));
+        // Cannot unnest on a scalar column
+        let err = nested_table_scan("test_table")?
+            .unnest_column("scalar")
+            .unwrap_err();
+        assert!(err
+            .to_string()
+            .starts_with("Internal error: trying to unnest on invalid data 
type UInt32"));
 
         // Unnesting the strings list.
         let plan = nested_table_scan("test_table")?
@@ -2088,36 +2156,65 @@ mod tests {
             .build()?;
 
         let expected = "\
-        Unnest: test_table.strings\
+        Unnest: lists[test_table.strings] structs[]\
         \n  TableScan: test_table";
         assert_eq!(expected, format!("{plan:?}"));
 
         // Check unnested field is a scalar
-        let field = plan
-            .schema()
-            .field_with_name(Some(&TableReference::bare("test_table")), 
"strings")
-            .unwrap();
+        let field = plan.schema().field_with_name(None, "strings").unwrap();
         assert_eq!(&DataType::Utf8, field.data_type());
 
-        // Unnesting multiple fields.
+        // Unnesting the singular struct column result into 2 new columns for 
each subfield
+        let plan = nested_table_scan("test_table")?
+            .unnest_column("struct_singular")?
+            .build()?;
+
+        let expected = "\
+        Unnest: lists[] structs[test_table.struct_singular]\
+        \n  TableScan: test_table";
+        assert_eq!(expected, format!("{plan:?}"));
+
+        for field_name in &["a", "b"] {
+            // Check unnested struct field is a scalar
+            let field = plan
+                .schema()
+                .field_with_name(None, &format!("struct_singular.{}", 
field_name))
+                .unwrap();
+            assert_eq!(&DataType::UInt32, field.data_type());
+        }
+
+        // Unnesting multiple fields in separate plans
         let plan = nested_table_scan("test_table")?
             .unnest_column("strings")?
             .unnest_column("structs")?
+            .unnest_column("struct_singular")?
             .build()?;
 
         let expected = "\
-        Unnest: test_table.structs\
-        \n  Unnest: test_table.strings\
-        \n    TableScan: test_table";
+        Unnest: lists[] structs[test_table.struct_singular]\
+        \n  Unnest: lists[test_table.structs] structs[]\
+        \n    Unnest: lists[test_table.strings] structs[]\
+        \n      TableScan: test_table";
         assert_eq!(expected, format!("{plan:?}"));
 
         // Check unnested struct list field should be a struct.
-        let field = plan
-            .schema()
-            .field_with_name(Some(&TableReference::bare("test_table")), 
"structs")
-            .unwrap();
+        let field = plan.schema().field_with_name(None, "structs").unwrap();
         assert!(matches!(field.data_type(), DataType::Struct(_)));
 
+        // Unnesting multiple fields at the same time
+        let cols = vec!["strings", "structs", "struct_singular"]
+            .into_iter()
+            .map(|c| c.into())
+            .collect();
+        let plan = nested_table_scan("test_table")?
+            .unnest_columns_with_options(cols, UnnestOptions::default())?
+            .build()?;
+
+        let expected = "\
+        Unnest: lists[test_table.strings, test_table.structs] 
structs[test_table.struct_singular]\
+        \n  TableScan: test_table";
+        assert_eq!(expected, format!("{plan:?}"));
+
         // Unnesting missing column should fail.
         let plan = nested_table_scan("test_table")?.unnest_column("missing");
         assert!(plan.is_err());
@@ -2126,8 +2223,9 @@ mod tests {
     }
 
     fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
-        // Create a schema with a scalar field, a list of strings, and a list 
of structs.
-        let struct_field = Field::new_struct(
+        // Create a schema with a scalar field, a list of strings, a list of 
structs
+        // and a singular struct
+        let struct_field_in_list = Field::new_struct(
             "item",
             vec![
                 Field::new("a", DataType::UInt32, false),
@@ -2139,7 +2237,15 @@ mod tests {
         let schema = Schema::new(vec![
             Field::new("scalar", DataType::UInt32, false),
             Field::new_list("strings", string_field, false),
-            Field::new_list("structs", struct_field, false),
+            Field::new_list("structs", struct_field_in_list.clone(), false),
+            Field::new(
+                "struct_singular",
+                DataType::Struct(Fields::from(vec![
+                    Field::new("a", DataType::UInt32, false),
+                    Field::new("b", DataType::UInt32, false),
+                ])),
+                false,
+            ),
         ]);
 
         table_scan(Some(table_name), &schema, None)
diff --git a/datafusion/expr/src/logical_plan/display.rs 
b/datafusion/expr/src/logical_plan/display.rs
index 3a2ed9ffc2..f3765fb184 100644
--- a/datafusion/expr/src/logical_plan/display.rs
+++ b/datafusion/expr/src/logical_plan/display.rs
@@ -30,7 +30,7 @@ use crate::dml::CopyTo;
 use arrow::datatypes::Schema;
 use datafusion_common::display::GraphvizBuilder;
 use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
-use datafusion_common::DataFusionError;
+use datafusion_common::{Column, DataFusionError};
 use serde_json::json;
 
 /// Formats plans with a single line per node. For example:
@@ -638,10 +638,25 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
                     "Node Type": "DescribeTable"
                 })
             }
-            LogicalPlan::Unnest(Unnest { columns, .. }) => {
+            LogicalPlan::Unnest(Unnest {
+                input: plan,
+                list_type_columns: list_col_indices,
+                struct_type_columns: struct_col_indices,
+                ..
+            }) => {
+                let input_columns = plan.schema().columns();
+                let list_type_columns = list_col_indices
+                    .iter()
+                    .map(|i| &input_columns[*i])
+                    .collect::<Vec<&Column>>();
+                let struct_type_columns = struct_col_indices
+                    .iter()
+                    .map(|i| &input_columns[*i])
+                    .collect::<Vec<&Column>>();
                 json!({
                     "Node Type": "Unnest",
-                    "Column": expr_vec_fmt!(columns),
+                    "ListColumn": expr_vec_fmt!(list_type_columns),
+                    "StructColumn": expr_vec_fmt!(struct_type_columns),
                 })
             }
         }
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 42f3e1f163..97592c05ab 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -667,12 +667,12 @@ impl LogicalPlan {
             LogicalPlan::DescribeTable(_) => Ok(self),
             LogicalPlan::Unnest(Unnest {
                 input,
-                columns,
-                schema: _,
+                exec_columns,
                 options,
+                ..
             }) => {
                 // Update schema with unnested column type.
-                unnest_with_options(unwrap_arc(input), columns, options)
+                unnest_with_options(unwrap_arc(input), exec_columns, options)
             }
         }
     }
@@ -1017,11 +1017,15 @@ impl LogicalPlan {
             }
             LogicalPlan::DescribeTable(_) => Ok(self.clone()),
             LogicalPlan::Unnest(Unnest {
-                columns, options, ..
+                exec_columns: columns,
+                options,
+                ..
             }) => {
                 // Update schema with unnested column type.
                 let input = inputs.swap_remove(0);
-                unnest_with_options(input, columns.clone(), options.clone())
+                let new_plan =
+                    unnest_with_options(input, columns.clone(), 
options.clone())?;
+                Ok(new_plan)
             }
         }
     }
@@ -1790,8 +1794,23 @@ impl LogicalPlan {
                     LogicalPlan::DescribeTable(DescribeTable { .. }) => {
                         write!(f, "DescribeTable")
                     }
-                    LogicalPlan::Unnest(Unnest { columns, .. }) => {
-                        write!(f, "Unnest: {}", expr_vec_fmt!(columns))
+                    LogicalPlan::Unnest(Unnest {
+                        input: plan,
+                        list_type_columns: list_col_indices,
+                        struct_type_columns: struct_col_indices, .. }) => {
+                        let input_columns = plan.schema().columns();
+                        let list_type_columns = list_col_indices
+                            .iter()
+                            .map(|i| &input_columns[*i])
+                            .collect::<Vec<&Column>>();
+                        let struct_type_columns = struct_col_indices
+                            .iter()
+                            .map(|i| &input_columns[*i])
+                            .collect::<Vec<&Column>>();
+                        // get items from input_columns indexed by 
list_col_indices
+                        write!(f, "Unnest: lists[{}] structs[{}]", 
+                        expr_vec_fmt!(list_type_columns),
+                        expr_vec_fmt!(struct_type_columns))
                     }
                 }
             }
@@ -2783,8 +2802,17 @@ pub enum Partitioning {
 pub struct Unnest {
     /// The incoming logical plan
     pub input: Arc<LogicalPlan>,
-    /// The columns to unnest
-    pub columns: Vec<Column>,
+    /// Columns to run unnest on, can be a list of (List/Struct) columns
+    pub exec_columns: Vec<Column>,
+    /// refer to the indices(in the input schema) of columns
+    /// that have type list to run unnest on
+    pub list_type_columns: Vec<usize>,
+    /// refer to the indices (in the input schema) of columns
+    /// that have type struct to run unnest on
+    pub struct_type_columns: Vec<usize>,
+    /// Having items aligned with the output columns
+    /// representing which column in the input schema each output column 
depends on
+    pub dependency_indices: Vec<usize>,
     /// The output schema, containing the unnested field column.
     pub schema: DFSchemaRef,
     /// Options
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs 
b/datafusion/expr/src/logical_plan/tree_node.rs
index ea1f1c3c85..215b2cb4d4 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -313,13 +313,19 @@ impl TreeNode for LogicalPlan {
             }
             LogicalPlan::Unnest(Unnest {
                 input,
-                columns,
+                exec_columns: input_columns,
+                list_type_columns,
+                struct_type_columns,
+                dependency_indices,
                 schema,
                 options,
             }) => rewrite_arc(input, f)?.update_data(|input| {
                 LogicalPlan::Unnest(Unnest {
                     input,
-                    columns,
+                    exec_columns: input_columns,
+                    dependency_indices,
+                    list_type_columns,
+                    struct_type_columns,
                     schema,
                     options,
                 })
@@ -492,7 +498,9 @@ impl LogicalPlan {
             LogicalPlan::TableScan(TableScan { filters, .. }) => {
                 filters.iter().apply_until_stop(f)
             }
-            LogicalPlan::Unnest(Unnest { columns, .. }) => {
+            LogicalPlan::Unnest(unnest) => {
+                let columns = unnest.exec_columns.clone();
+
                 let exprs = columns
                     .iter()
                     .map(|c| Expr::Column(c.clone()))
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs 
b/datafusion/optimizer/src/optimize_projections/mod.rs
index 49b52aa53a..af51814c96 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -30,6 +30,7 @@ use datafusion_common::{
     JoinType, Result,
 };
 use datafusion_expr::expr::Alias;
+use datafusion_expr::Unnest;
 use datafusion_expr::{
     logical_plan::LogicalPlan, projection_schema, Aggregate, Distinct, Expr, 
Projection,
     TableScan, Window,
@@ -289,7 +290,6 @@ fn optimize_projections(
         LogicalPlan::Sort(_)
         | LogicalPlan::Filter(_)
         | LogicalPlan::Repartition(_)
-        | LogicalPlan::Unnest(_)
         | LogicalPlan::Union(_)
         | LogicalPlan::SubqueryAlias(_)
         | LogicalPlan::Distinct(Distinct::On(_)) => {
@@ -399,6 +399,13 @@ fn optimize_projections(
                 "OptimizeProjection: should have handled in the match 
statement above"
             );
         }
+        LogicalPlan::Unnest(Unnest {
+            dependency_indices, ..
+        }) => {
+            vec![RequiredIndicies::new_from_indices(
+                dependency_indices.clone(),
+            )]
+        }
     };
 
     // Required indices are currently ordered (child0, child1, ...)
diff --git a/datafusion/physical-plan/src/unnest.rs 
b/datafusion/physical-plan/src/unnest.rs
index 06dd8230d3..a8151fe022 100644
--- a/datafusion/physical-plan/src/unnest.rs
+++ b/datafusion/physical-plan/src/unnest.rs
@@ -23,8 +23,8 @@ use std::{any::Any, sync::Arc};
 use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
 use super::{DisplayAs, ExecutionPlanProperties, PlanProperties};
 use crate::{
-    expressions::Column, DisplayFormatType, Distribution, ExecutionPlan, 
PhysicalExpr,
-    RecordBatchStream, SendableRecordBatchStream,
+    DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream,
+    SendableRecordBatchStream,
 };
 
 use arrow::array::{
@@ -36,18 +36,24 @@ use arrow::compute::kernels::zip::zip;
 use arrow::compute::{cast, is_not_null, kernels, sum};
 use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
-use arrow_array::{Int64Array, Scalar};
+use arrow_array::{Int64Array, Scalar, StructArray};
 use arrow_ord::cmp::lt;
-use datafusion_common::{exec_datafusion_err, exec_err, Result, UnnestOptions};
+use datafusion_common::{
+    exec_datafusion_err, exec_err, internal_err, Result, UnnestOptions,
+};
 use datafusion_execution::TaskContext;
+use datafusion_expr::ColumnarValue;
 use datafusion_physical_expr::EquivalenceProperties;
 
 use async_trait::async_trait;
 use futures::{Stream, StreamExt};
+use hashbrown::HashSet;
 use log::trace;
 
-/// Unnest the given columns by joining the row with each value in the
-/// nested type.
+/// Unnest the given columns (either with type struct or list)
+/// For list unnesting, each rows is vertically transformed into multiple rows
+/// For struct unnesting, each columns is horizontally transformed into 
multiple columns,
+/// Thus the original RecordBatch with dimension (n x m) may have new 
dimension (n' x m')
 ///
 /// See [`UnnestOptions`] for more details and an example.
 #[derive(Debug)]
@@ -56,8 +62,10 @@ pub struct UnnestExec {
     input: Arc<dyn ExecutionPlan>,
     /// The schema once the unnest is applied
     schema: SchemaRef,
-    /// The unnest columns
-    columns: Vec<Column>,
+    /// indices of the list-typed columns in the input schema
+    list_column_indices: Vec<usize>,
+    /// indices of the struct-typed columns in the input schema
+    struct_column_indices: Vec<usize>,
     /// Options
     options: UnnestOptions,
     /// Execution metrics
@@ -70,15 +78,18 @@ impl UnnestExec {
     /// Create a new [UnnestExec].
     pub fn new(
         input: Arc<dyn ExecutionPlan>,
-        columns: Vec<Column>,
+        list_column_indices: Vec<usize>,
+        struct_column_indices: Vec<usize>,
         schema: SchemaRef,
         options: UnnestOptions,
     ) -> Self {
         let cache = Self::compute_properties(&input, schema.clone());
+
         UnnestExec {
             input,
             schema,
-            columns,
+            list_column_indices,
+            struct_column_indices,
             options,
             metrics: Default::default(),
             cache,
@@ -137,7 +148,8 @@ impl ExecutionPlan for UnnestExec {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         Ok(Arc::new(UnnestExec::new(
             children[0].clone(),
-            self.columns.clone(),
+            self.list_column_indices.clone(),
+            self.struct_column_indices.clone(),
             self.schema.clone(),
             self.options.clone(),
         )))
@@ -158,7 +170,8 @@ impl ExecutionPlan for UnnestExec {
         Ok(Box::pin(UnnestStream {
             input,
             schema: self.schema.clone(),
-            columns: self.columns.clone(),
+            list_type_columns: self.list_column_indices.clone(),
+            struct_column_indices: 
self.struct_column_indices.iter().copied().collect(),
             options: self.options.clone(),
             metrics,
         }))
@@ -214,7 +227,8 @@ struct UnnestStream {
     /// Unnested schema
     schema: Arc<Schema>,
     /// The unnest columns
-    columns: Vec<Column>,
+    list_type_columns: Vec<usize>,
+    struct_column_indices: HashSet<usize>,
     /// Options
     options: UnnestOptions,
     /// Metrics
@@ -251,8 +265,13 @@ impl UnnestStream {
             .map(|maybe_batch| match maybe_batch {
                 Some(Ok(batch)) => {
                     let timer = self.metrics.elapsed_compute.timer();
-                    let result =
-                        build_batch(&batch, &self.schema, &self.columns, 
&self.options);
+                    let result = build_batch(
+                        &batch,
+                        &self.schema,
+                        &self.list_type_columns,
+                        &self.struct_column_indices,
+                        &self.options,
+                    );
                     self.metrics.input_batches.add(1);
                     self.metrics.input_rows.add(batch.num_rows());
                     if let Ok(ref batch) = result {
@@ -279,48 +298,105 @@ impl UnnestStream {
     }
 }
 
-/// For each row in a `RecordBatch`, some list columns need to be unnested.
-/// We will expand the values in each list into multiple rows,
+/// Given a set of struct column indices to flatten
+/// try converting the column in input into multiple subfield columns
+/// For example
+/// struct_col: [a: struct(item: int, name: string), b: int]
+/// with a batch
+/// {a: {item: 1, name: "a"}, b: 2},
+/// {a: {item: 3, name: "b"}, b: 4]
+/// will be converted into
+/// {a.item: 1, a.name: "a", b: 2},
+/// {a.item: 3, a.name: "b", b: 4}
+fn flatten_struct_cols(
+    input_batch: &[Arc<dyn Array>],
+    schema: &SchemaRef,
+    struct_column_indices: &HashSet<usize>,
+) -> Result<RecordBatch> {
+    // horizontal expansion because of struct unnest
+    let columns_expanded = input_batch
+        .iter()
+        .enumerate()
+        .map(|(idx, column_data)| match struct_column_indices.get(&idx) {
+            Some(_) => match column_data.data_type() {
+                DataType::Struct(_) => {
+                    let struct_arr =
+                        
column_data.as_any().downcast_ref::<StructArray>().unwrap();
+                    Ok(struct_arr.columns().to_vec())
+                }
+                data_type => internal_err!(
+                    "expecting column {} from input plan to be a struct, got 
{:?}",
+                    idx,
+                    data_type
+                ),
+            },
+            None => Ok(vec![column_data.clone()]),
+        })
+        .collect::<Result<Vec<_>>>()?
+        .into_iter()
+        .flatten()
+        .collect();
+    Ok(RecordBatch::try_new(schema.clone(), columns_expanded)?)
+}
+
+/// For each row in a `RecordBatch`, some list/struct columns need to be 
unnested.
+/// - For list columns: We will expand the values in each list into multiple 
rows,
 /// taking the longest length among these lists, and shorter lists are padded 
with NULLs.
-//
+/// - For struct columns: We will expand the struct columns into multiple 
subfield columns.
 /// For columns that don't need to be unnested, repeat their values until 
reaching the longest length.
 fn build_batch(
     batch: &RecordBatch,
     schema: &SchemaRef,
-    columns: &[Column],
+    list_type_columns: &[usize],
+    struct_column_indices: &HashSet<usize>,
     options: &UnnestOptions,
 ) -> Result<RecordBatch> {
-    let list_arrays: Vec<ArrayRef> = columns
-        .iter()
-        .map(|column| column.evaluate(batch)?.into_array(batch.num_rows()))
-        .collect::<Result<_>>()?;
+    let transformed = match list_type_columns.len() {
+        0 => flatten_struct_cols(batch.columns(), schema, 
struct_column_indices),
+        _ => {
+            let list_arrays: Vec<ArrayRef> = list_type_columns
+                .iter()
+                .map(|index| {
+                    ColumnarValue::Array(batch.column(*index).clone())
+                        .into_array(batch.num_rows())
+                })
+                .collect::<Result<_>>()?;
+
+            let longest_length = find_longest_length(&list_arrays, options)?;
+            let unnested_length = longest_length.as_primitive::<Int64Type>();
+            let total_length = if unnested_length.is_empty() {
+                0
+            } else {
+                sum(unnested_length).ok_or_else(|| {
+                    exec_datafusion_err!("Failed to calculate the total 
unnested length")
+                })? as usize
+            };
+            if total_length == 0 {
+                return Ok(RecordBatch::new_empty(schema.clone()));
+            }
 
-    let longest_length = find_longest_length(&list_arrays, options)?;
-    let unnested_length = longest_length.as_primitive::<Int64Type>();
-    let total_length = if unnested_length.is_empty() {
-        0
-    } else {
-        sum(unnested_length).ok_or_else(|| {
-            exec_datafusion_err!("Failed to calculate the total unnested 
length")
-        })? as usize
+            // Unnest all the list arrays
+            let unnested_arrays =
+                unnest_list_arrays(&list_arrays, unnested_length, 
total_length)?;
+            let unnested_array_map: HashMap<_, _> = unnested_arrays
+                .into_iter()
+                .zip(list_type_columns.iter())
+                .map(|(array, column)| (*column, array))
+                .collect();
+
+            // Create the take indices array for other columns
+            let take_indicies = create_take_indicies(unnested_length, 
total_length);
+
+            // vertical expansion because of list unnest
+            let ret = flatten_list_cols_from_indices(
+                batch,
+                &unnested_array_map,
+                &take_indicies,
+            )?;
+            flatten_struct_cols(&ret, schema, struct_column_indices)
+        }
     };
-    if total_length == 0 {
-        return Ok(RecordBatch::new_empty(schema.clone()));
-    }
-
-    // Unnest all the list arrays
-    let unnested_arrays =
-        unnest_list_arrays(&list_arrays, unnested_length, total_length)?;
-    let unnested_array_map: HashMap<_, _> = unnested_arrays
-        .into_iter()
-        .zip(columns.iter())
-        .map(|(array, column)| (column.index(), array))
-        .collect();
-
-    // Create the take indices array for other columns
-    let take_indicies = create_take_indicies(unnested_length, total_length);
-
-    batch_from_indices(batch, schema, &unnested_array_map, &take_indicies)
+    transformed
 }
 
 /// Find the longest list length among the given list arrays for each row.
@@ -505,7 +581,8 @@ fn unnest_list_array(
     )?)
 }
 
-/// Creates take indicies that will be used to expand all columns except for 
the unnest [`columns`](UnnestExec::columns).
+/// Creates take indicies that will be used to expand all columns except for 
the list type
+/// [`columns`](UnnestExec::list_column_indices) that is being unnested.
 /// Every column value needs to be repeated multiple times according to the 
length array.
 ///
 /// If the length array looks like this:
@@ -568,12 +645,11 @@ fn create_take_indicies(
 /// c2: 'a', 'b', 'c', 'c', 'c', null, 'd', 'd'
 /// ```
 ///
-fn batch_from_indices(
+fn flatten_list_cols_from_indices(
     batch: &RecordBatch,
-    schema: &SchemaRef,
     unnested_list_arrays: &HashMap<usize, ArrayRef>,
     indices: &PrimitiveArray<Int64Type>,
-) -> Result<RecordBatch> {
+) -> Result<Vec<Arc<dyn Array>>> {
     let arrays = batch
         .columns()
         .iter()
@@ -583,8 +659,7 @@ fn batch_from_indices(
             None => Ok(kernels::take::take(arr, indices, None)?),
         })
         .collect::<Result<Vec<_>>>()?;
-
-    Ok(RecordBatch::try_new(schema.clone(), arrays.to_vec())?)
+    Ok(arrays)
 }
 
 #[cfg(test)]
diff --git a/datafusion/sql/src/expr/function.rs 
b/datafusion/sql/src/expr/function.rs
index 7abe5ecdae..1f8492b9ba 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -358,10 +358,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         match arg.get_type(schema)? {
             DataType::List(_)
             | DataType::LargeList(_)
-            | DataType::FixedSizeList(_, _) => Ok(()),
-            DataType::Struct(_) => {
-                not_impl_err!("unnest() does not support struct yet")
-            }
+            | DataType::FixedSizeList(_, _)
+            | DataType::Struct(_) => Ok(()),
             DataType::Null => {
                 not_impl_err!("unnest() does not support null yet")
             }
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 730e84cd09..d2cd1bcf3a 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -20,14 +20,14 @@ use std::sync::Arc;
 
 use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
 use crate::utils::{
-    check_columns_satisfy_exprs, extract_aliases, rebase_expr, 
resolve_aliases_to_exprs,
-    resolve_columns, resolve_positions_to_exprs,
+    check_columns_satisfy_exprs, extract_aliases, rebase_expr,
+    recursive_transform_unnest, resolve_aliases_to_exprs, resolve_columns,
+    resolve_positions_to_exprs,
 };
 
-use datafusion_common::tree_node::{Transformed, TreeNode};
 use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
 use datafusion_common::{Column, UnnestOptions};
-use datafusion_expr::expr::{Alias, Unnest};
+use datafusion_expr::expr::Alias;
 use datafusion_expr::expr_rewriter::{
     normalize_col, normalize_col_with_schemas_and_ambiguity_check, 
normalize_cols,
 };
@@ -298,47 +298,29 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         select_exprs: Vec<Expr>,
     ) -> Result<LogicalPlan> {
         let mut unnest_columns = vec![];
+        // from which column used for projection, before the unnest happen
+        // including non unnest column and unnest column
         let mut inner_projection_exprs = vec![];
 
-        let outer_projection_exprs = select_exprs
+        // expr returned here maybe different from the originals in 
inner_projection_exprs
+        // for example:
+        // - unnest(struct_col) will be transformed into 
unnest(struct_col).field1, unnest(struct_col).field2
+        // - unnest(array_col) will be transformed into 
unnest(array_col).element
+        // - unnest(array_col) + 1 will be transformed into 
unnest(array_col).element +1
+        let outer_projection_exprs: Vec<Expr> = select_exprs
             .into_iter()
             .map(|expr| {
-                let Transformed {
-                    data: transformed_expr,
-                    transformed,
-                    tnr: _,
-                } = expr.transform_up(|expr: Expr| {
-                    if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
-                        let column_name = expr.display_name()?;
-                        unnest_columns.push(column_name.clone());
-                        // Add alias for the argument expression, to avoid 
naming conflicts with other expressions
-                        // in the select list. For example: `select 
unnest(col1), col1 from t`.
-                        inner_projection_exprs
-                            .push(arg.clone().alias(column_name.clone()));
-                        Ok(Transformed::yes(Expr::Column(Column::from_name(
-                            column_name,
-                        ))))
-                    } else {
-                        Ok(Transformed::no(expr))
-                    }
-                })?;
-
-                if !transformed {
-                    if matches!(&transformed_expr, Expr::Column(_)) {
-                        inner_projection_exprs.push(transformed_expr.clone());
-                        Ok(transformed_expr)
-                    } else {
-                        // We need to evaluate the expr in the inner 
projection,
-                        // outer projection just select its name
-                        let column_name = transformed_expr.display_name()?;
-                        inner_projection_exprs.push(transformed_expr);
-                        Ok(Expr::Column(Column::from_name(column_name)))
-                    }
-                } else {
-                    Ok(transformed_expr)
-                }
+                recursive_transform_unnest(
+                    &input,
+                    &mut unnest_columns,
+                    &mut inner_projection_exprs,
+                    expr,
+                )
             })
-            .collect::<Result<Vec<_>>>()?;
+            .collect::<Result<Vec<_>>>()?
+            .into_iter()
+            .flatten()
+            .collect();
 
         // Do the final projection
         if unnest_columns.is_empty() {
diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs
index 2c50d3af1f..4ae486ef1a 100644
--- a/datafusion/sql/src/utils.rs
+++ b/datafusion/sql/src/utils.rs
@@ -26,9 +26,10 @@ use datafusion_common::tree_node::{Transformed, 
TransformedResult, TreeNode};
 use datafusion_common::{
     exec_err, internal_err, plan_err, Column, DataFusionError, Result, 
ScalarValue,
 };
-use datafusion_expr::expr::{Alias, GroupingSet, WindowFunction};
+use datafusion_expr::builder::get_unnested_columns;
+use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction};
 use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs};
-use datafusion_expr::{expr_vec_fmt, Expr, LogicalPlan};
+use datafusion_expr::{expr_vec_fmt, Expr, ExprSchemable, LogicalPlan};
 use sqlparser::ast::Ident;
 
 /// Make a best-effort attempt at resolving all columns in the expression tree
@@ -255,3 +256,185 @@ pub(crate) fn normalize_ident(id: Ident) -> String {
         None => id.value.to_ascii_lowercase(),
     }
 }
+
+/// The context is we want to rewrite unnest() into 
InnerProjection->Unnest->OuterProjection
+/// Given an expression which contains unnest expr as one of its children,
+/// Try transform depends on unnest type
+/// - For list column: unnest(col) with type list -> unnest(col) with type 
list::item
+/// - For struct column: unnest(struct(field1, field2)) -> 
unnest(struct).field1, unnest(struct).field2
+/// The transformed exprs will be used in the outer projection
+pub(crate) fn recursive_transform_unnest(
+    input: &LogicalPlan,
+    unnest_placeholder_columns: &mut Vec<String>,
+    inner_projection_exprs: &mut Vec<Expr>,
+    original_expr: Expr,
+) -> Result<Vec<Expr>> {
+    let mut transform =
+        |unnest_expr: &Expr, expr_in_unnest: &Expr| -> Result<Vec<Expr>> {
+            // Full context, we are trying to plan the execution as 
InnerProjection->Unnest->OuterProjection
+            // inside unnest execution, each column inside the inner projection
+            // will be transformed into new columns. Thus we need to keep 
track of these placeholding column names
+            let placeholder_name = unnest_expr.display_name()?;
+
+            unnest_placeholder_columns.push(placeholder_name.clone());
+            // Add alias for the argument expression, to avoid naming conflicts
+            // with other expressions in the select list. For example: `select 
unnest(col1), col1 from t`.
+            // this extra projection is used to unnest transforming
+            inner_projection_exprs
+                .push(expr_in_unnest.clone().alias(placeholder_name.clone()));
+            let schema = input.schema();
+
+            let (data_type, _) = 
expr_in_unnest.data_type_and_nullable(schema)?;
+
+            let outer_projection_columns =
+                get_unnested_columns(&placeholder_name, &data_type)?;
+            let expr = outer_projection_columns
+                .iter()
+                .map(|col| Expr::Column(col.0.clone()))
+                .collect::<Vec<_>>();
+            Ok(expr)
+        };
+    // expr transformed maybe either the same, or different from the originals 
exprs
+    // for example:
+    // - unnest(struct_col) will be transformed into 
unnest(struct_col).field1, unnest(struct_col).field2
+    // - unnest(array_col) will be transformed into unnest(array_col)
+    // - unnest(array_col) + 1 will be transformed into unnest(array_col) + 1
+
+    // Specifically handle root level unnest expr, this is the only place
+    // unnest on struct can be handled
+    if let Expr::Unnest(Unnest { expr: ref arg }) = original_expr {
+        return transform(&original_expr, arg);
+    }
+    let Transformed {
+            data: transformed_expr,
+            transformed,
+            tnr: _,
+        } = original_expr.transform_up(|expr: Expr| {
+            if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
+                let (data_type, _) = 
expr.data_type_and_nullable(input.schema())?;
+                if let DataType::Struct(_) = data_type {
+                    return internal_err!("unnest on struct can ony be applied 
at the root level of select expression");
+                }
+                let transformed_exprs = transform(&expr, arg)?;
+                Ok(Transformed::yes(transformed_exprs[0].clone()))
+            } else {
+                Ok(Transformed::no(expr))
+            }
+        })?;
+
+    if !transformed {
+        if matches!(&transformed_expr, Expr::Column(_)) {
+            inner_projection_exprs.push(transformed_expr.clone());
+            Ok(vec![transformed_expr])
+        } else {
+            // We need to evaluate the expr in the inner projection,
+            // outer projection just select its name
+            let column_name = transformed_expr.display_name()?;
+            inner_projection_exprs.push(transformed_expr);
+            Ok(vec![Expr::Column(Column::from_name(column_name))])
+        }
+    } else {
+        Ok(vec![transformed_expr])
+    }
+}
+
+// write test for recursive_transform_unnest
+#[cfg(test)]
+mod tests {
+    use std::{ops::Add, sync::Arc};
+
+    use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
+    use arrow_schema::Fields;
+    use datafusion_common::{DFSchema, Result};
+    use datafusion_expr::{col, lit, unnest, EmptyRelation, LogicalPlan};
+
+    use crate::utils::recursive_transform_unnest;
+
+    #[test]
+    fn test_recursive_transform_unnest() -> Result<()> {
+        let schema = Schema::new(vec![
+            Field::new(
+                "struct_col",
+                ArrowDataType::Struct(Fields::from(vec![
+                    Field::new("field1", ArrowDataType::Int32, false),
+                    Field::new("field2", ArrowDataType::Int32, false),
+                ])),
+                false,
+            ),
+            Field::new(
+                "array_col",
+                ArrowDataType::List(Arc::new(Field::new(
+                    "item",
+                    ArrowDataType::Int64,
+                    true,
+                ))),
+                true,
+            ),
+            Field::new("int_col", ArrowDataType::Int32, false),
+        ]);
+
+        let dfschema = DFSchema::try_from(schema)?;
+
+        let input = LogicalPlan::EmptyRelation(EmptyRelation {
+            produce_one_row: false,
+            schema: Arc::new(dfschema),
+        });
+
+        let mut unnest_placeholder_columns = vec![];
+        let mut inner_projection_exprs = vec![];
+
+        // unnest(struct_col)
+        let original_expr = unnest(col("struct_col"));
+        let transformed_exprs = recursive_transform_unnest(
+            &input,
+            &mut unnest_placeholder_columns,
+            &mut inner_projection_exprs,
+            original_expr,
+        )?;
+        assert_eq!(
+            transformed_exprs,
+            vec![
+                col("unnest(struct_col).field1"),
+                col("unnest(struct_col).field2"),
+            ]
+        );
+        assert_eq!(unnest_placeholder_columns, vec!["unnest(struct_col)"]);
+        // still reference struct_col in original schema but with alias,
+        // to avoid colliding with the projection on the column itself if any
+        assert_eq!(
+            inner_projection_exprs,
+            vec![col("struct_col").alias("unnest(struct_col)"),]
+        );
+
+        // unnest(array_col) + 1
+        let original_expr = unnest(col("array_col")).add(lit(1i64));
+        let transformed_exprs = recursive_transform_unnest(
+            &input,
+            &mut unnest_placeholder_columns,
+            &mut inner_projection_exprs,
+            original_expr,
+        )?;
+        assert_eq!(
+            unnest_placeholder_columns,
+            vec!["unnest(struct_col)", "unnest(array_col)"]
+        );
+        // only transform the unnest children
+        assert_eq!(
+            transformed_exprs,
+            vec![col("unnest(array_col)").add(lit(1i64))]
+        );
+
+        // keep appending to the current vector
+        // still reference array_col in original schema but with alias,
+        // to avoid colliding with the projection on the column itself if any
+        assert_eq!(
+            inner_projection_exprs,
+            vec![
+                col("struct_col").alias("unnest(struct_col)"),
+                col("array_col").alias("unnest(array_col)")
+            ]
+        );
+
+        Ok(())
+    }
+}
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index bbedaca6a8..cca96b6eb9 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -2905,6 +2905,21 @@ impl ContextProvider for MockContextProvider {
                 Field::new("Id", DataType::UInt32, false),
                 Field::new("lower", DataType::UInt32, false),
             ])),
+            "unnest_table" => Ok(Schema::new(vec![
+                Field::new(
+                    "array_col",
+                    DataType::List(Arc::new(Field::new("item", 
DataType::Int64, true))),
+                    false,
+                ),
+                Field::new(
+                    "struct_col",
+                    DataType::Struct(Fields::from(vec![
+                        Field::new("field1", DataType::Int64, true),
+                        Field::new("field2", DataType::Utf8, true),
+                    ])),
+                    false,
+                ),
+            ])),
             _ => plan_err!("No table named: {} found", name.table()),
         };
 
@@ -4715,6 +4730,29 @@ fn roundtrip_crossjoin() -> Result<()> {
     Ok(())
 }
 
+#[test]
+fn test_unnest_logical_plan() -> Result<()> {
+    let query = "select unnest(struct_col), unnest(array_col), struct_col, 
array_col from unnest_table";
+
+    let dialect = GenericDialect {};
+    let statement = Parser::new(&dialect)
+        .try_with_sql(query)?
+        .parse_statement()?;
+
+    let context = MockContextProvider::default();
+    let sql_to_rel = SqlToRel::new(&context);
+    let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap();
+
+    let expected = "Projection: unnest(unnest_table.struct_col).field1, 
unnest(unnest_table.struct_col).field2, unnest(unnest_table.array_col), 
unnest_table.struct_col, unnest_table.array_col\
+        \n  Unnest: lists[unnest(unnest_table.array_col)] 
structs[unnest(unnest_table.struct_col)]\
+        \n    Projection: unnest_table.struct_col AS 
unnest(unnest_table.struct_col), unnest_table.array_col AS 
unnest(unnest_table.array_col), unnest_table.struct_col, unnest_table.array_col\
+        \n      TableScan: unnest_table";
+
+    assert_eq!(format!("{plan:?}"), expected);
+
+    Ok(())
+}
+
 #[cfg(test)]
 #[ctor::ctor]
 fn init() {
diff --git a/datafusion/sqllogictest/test_files/unnest.slt 
b/datafusion/sqllogictest/test_files/unnest.slt
index ca7e73cb87..7b7249d6d5 100644
--- a/datafusion/sqllogictest/test_files/unnest.slt
+++ b/datafusion/sqllogictest/test_files/unnest.slt
@@ -22,12 +22,19 @@
 statement ok
 CREATE TABLE unnest_table
 AS VALUES
-    ([1,2,3], [7], 1, [13, 14]),
-    ([4,5], [8,9,10], 2, [15, 16]),
-    ([6], [11,12], 3, null),
-    ([12], [null, 42, null], null, null),
+    ([1,2,3], [7], 1, [13, 14], struct(1,2)),
+    ([4,5], [8,9,10], 2, [15, 16], struct(3,4)),
+    ([6], [11,12], 3, null, null),
+    ([12], [null, 42, null], null, null, struct(7,8)),
     -- null array to verify the `preserve_nulls` option
-    (null, null, 4, [17, 18])
+    (null, null, 4, [17, 18], null)
+;
+
+statement ok
+CREATE TABLE nested_unnest_table
+AS VALUES 
+    (struct('a', 'b', struct('c')), (struct('a', 'b', [10,20]))), 
+    (struct('d', 'e', struct('f')), (struct('x', 'y', [30,40, 50])))
 ;
 
 ## Basic unnest expression in select list
@@ -38,7 +45,13 @@ select unnest([1,2,3]);
 2
 3
 
-## Basic unnest expression in from clause
+## Basic unnest expression in select struct
+query III
+select unnest(struct(1,2,3));
+----
+1 2 3
+
+## Basic unnest list expression in from clause
 query I
 select * from unnest([1,2,3]);
 ----
@@ -46,6 +59,20 @@ select * from unnest([1,2,3]);
 2
 3
 
+## Basic unnest struct expression in from clause
+query III
+select * from unnest(struct(1,2,3));
+----
+1 2 3
+
+## Multiple unnest expression in from clause 
+query IIII
+select * from unnest(struct(1,2,3)),unnest([4,5,6]);
+----
+1 2 3 4
+1 2 3 5
+1 2 3 6
+
 
 ## Unnest null in select list
 query error DataFusion error: This feature is not implemented: unnest\(\) does 
not support null yet
@@ -145,10 +172,6 @@ select array_remove(column1, 4), unnest(column2), column3 
* 10 from unnest_table
 [12] NULL NULL
 
 
-## Unnest column with scalars
-query error DataFusion error: Error during planning: unnest\(\) can only be 
applied to array, struct and null
-select unnest(column3) from unnest_table;
-
 ## Unnest doesn't work with untyped nulls
 query error DataFusion error: This feature is not implemented: unnest\(\) does 
not support null yet
 select unnest(null) from unnest_table;
@@ -233,12 +256,16 @@ select * from unnest([], NULL::int[]);
 
 
 ## Unnest struct expression in select list
-query error DataFusion error: This feature is not implemented: unnest\(\) does 
not support struct yet
+query ?
 select unnest(struct(null));
+----
+NULL
 
 ## Unnest struct expression in from clause
-query error DataFusion error: This feature is not implemented: unnest\(\) does 
not support struct yet
+query ?
 select * from unnest(struct(null));
+----
+NULL
 
 
 ## Unnest array expression
@@ -288,6 +315,18 @@ select unnest(array_remove(column1, 12)) from unnest_table;
 5
 6
 
+## unnest struct-typed column and list-typed column at the same time
+query I?II?
+select unnest(column1), column1, unnest(column5), column5 from unnest_table;
+----
+1 [1, 2, 3] 1 2 {c0: 1, c1: 2}
+2 [1, 2, 3] 1 2 {c0: 1, c1: 2}
+3 [1, 2, 3] 1 2 {c0: 1, c1: 2}
+4 [4, 5] 3 4 {c0: 3, c1: 4}
+5 [4, 5] 3 4 {c0: 3, c1: 4}
+6 [6] NULL NULL NULL
+12 [12] 7 8 {c0: 7, c1: 8}
+
 
 ## Unnest in from clause with alias
 query I
@@ -383,8 +422,26 @@ select unnest(array_remove(column1, 3)) - 1 as c1, column3 
from unnest_table;
 5 3
 11 NULL
 
+## unnest for nested struct(struct)
+query TT?
+select unnest(column1) from nested_unnest_table;
+----
+a b {c0: c}
+d e {c0: f}
+
+## unnest for nested(struct(list))
+query TT?
+select unnest(column2) from nested_unnest_table;
+----
+a b [10, 20]
+x y [30, 40, 50]
+
 query error DataFusion error: type_coercion\ncaused by\nThis feature is not 
implemented: Unnest should be rewritten to LogicalPlan::Unnest before type 
coercion
 select sum(unnest(generate_series(1,10)));
 
+## TODO: support unnest as a child expr
+query error DataFusion error: Internal error: unnest on struct can ony be 
applied at the root level of select expression 
+select arrow_typeof(unnest(column5)) from unnest_table;
+
 statement ok
 drop table unnest_table;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to