This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 24197d706e Unnest with single expression (#9069)
24197d706e is described below

commit 24197d706e5d899973f77639aa07058fc51fd33f
Author: Jay Zhan <[email protected]>
AuthorDate: Sun Feb 4 21:24:42 2024 +0800

    Unnest with single expression (#9069)
    
    * introduce basic unnest
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * proto
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rename
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * change panic to internal error
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * typo
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * more err handling
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * fix reimported
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * comments and fix typo
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * Add tests for nulls
    
    ---------
    
    Signed-off-by: jayzhan211 <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/datasource/listing/helpers.rs  |   1 +
 datafusion/core/src/physical_planner.rs            |   5 +
 datafusion/expr/src/expr.rs                        |  13 +++
 datafusion/expr/src/expr_rewriter/mod.rs           |  12 ++-
 datafusion/expr/src/expr_schema.rs                 |  10 +-
 datafusion/expr/src/tree_node/expr.rs              |   6 +-
 datafusion/expr/src/utils.rs                       |   3 +-
 datafusion/optimizer/src/analyzer/type_coercion.rs |   3 +
 datafusion/optimizer/src/push_down_filter.rs       |   1 +
 .../src/simplify_expressions/expr_simplifier.rs    |   1 +
 datafusion/proto/proto/datafusion.proto            |   6 ++
 datafusion/proto/src/generated/pbjson.rs           | 104 +++++++++++++++++++++
 datafusion/proto/src/generated/prost.rs            |  10 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |   9 ++
 datafusion/proto/src/logical_plan/to_proto.rs      |  14 ++-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  12 ++-
 datafusion/sql/src/expr/function.rs                |  54 ++++++++++-
 datafusion/sql/src/select.rs                       |  41 +++++++-
 datafusion/sqllogictest/test_files/unnest.slt      |  97 +++++++++++++++++++
 19 files changed, 385 insertions(+), 17 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index a03bcec7ab..077356b716 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -126,6 +126,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: 
&Expr) -> bool {
             | Expr::Sort { .. }
             | Expr::WindowFunction { .. }
             | Expr::Wildcard { .. }
+            | Expr::Unnest { .. }
             | Expr::Placeholder(_) => {
                 is_applicable = false;
                 Ok(VisitRecursion::Stop)
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 6aa0c93fad..591a3e3131 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -124,6 +124,11 @@ fn physical_name(e: &Expr) -> Result<String> {
 
 fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
     match e {
+        Expr::Unnest(_) => {
+            internal_err!(
+                "Expr::Unnest should have been converted to 
LogicalPlan::Unnest"
+            )
+        }
         Expr::Column(c) => {
             if is_first_expr {
                 Ok(c.name.clone())
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 0000f3df03..09de4b708d 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -180,6 +180,13 @@ pub enum Expr {
     /// A place holder which hold a reference to a qualified field
     /// in the outer query, used for correlated sub queries.
     OuterReferenceColumn(DataType, Column),
+    /// Unnest expression
+    Unnest(Unnest),
+}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct Unnest {
+    pub exprs: Vec<Expr>,
 }
 
 /// Alias expression
@@ -917,6 +924,7 @@ impl Expr {
             Expr::TryCast { .. } => "TryCast",
             Expr::WindowFunction { .. } => "WindowFunction",
             Expr::Wildcard { .. } => "Wildcard",
+            Expr::Unnest { .. } => "Unnest",
         }
     }
 
@@ -1307,6 +1315,7 @@ impl Expr {
             | Expr::Negative(..)
             | Expr::OuterReferenceColumn(_, _)
             | Expr::TryCast(..)
+            | Expr::Unnest(..)
             | Expr::Wildcard { .. }
             | Expr::WindowFunction(..)
             | Expr::Literal(..)
@@ -1561,6 +1570,9 @@ impl fmt::Display for Expr {
                 }
             },
             Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
+            Expr::Unnest(Unnest { exprs }) => {
+                write!(f, "UNNEST({exprs:?})")
+            }
         }
     }
 }
@@ -1748,6 +1760,7 @@ fn create_name(e: &Expr) -> Result<String> {
                 }
             }
         }
+        Expr::Unnest(Unnest { exprs }) => Ok(format!("UNNEST({exprs:?})")),
         Expr::ScalarFunction(fun) => create_function_name(fun.name(), false, 
&fun.args),
         Expr::WindowFunction(WindowFunction {
             fun,
diff --git a/datafusion/expr/src/expr_rewriter/mod.rs 
b/datafusion/expr/src/expr_rewriter/mod.rs
index 1f04c80833..3f7388c3c3 100644
--- a/datafusion/expr/src/expr_rewriter/mod.rs
+++ b/datafusion/expr/src/expr_rewriter/mod.rs
@@ -17,7 +17,7 @@
 
 //! Expression rewriter
 
-use crate::expr::Alias;
+use crate::expr::{Alias, Unnest};
 use crate::logical_plan::Projection;
 use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
 use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
@@ -75,6 +75,16 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
     schemas: &[&[&DFSchema]],
     using_columns: &[HashSet<Column>],
 ) -> Result<Expr> {
+    // Normalize column inside Unnest
+    if let Expr::Unnest(Unnest { exprs }) = expr {
+        let e = normalize_col_with_schemas_and_ambiguity_check(
+            exprs[0].clone(),
+            schemas,
+            using_columns,
+        )?;
+        return Ok(Expr::Unnest(Unnest { exprs: vec![e] }));
+    }
+
     expr.transform(&|expr| {
         Ok({
             if let Expr::Column(c) = expr {
diff --git a/datafusion/expr/src/expr_schema.rs 
b/datafusion/expr/src/expr_schema.rs
index 865279ab9a..d30f304a26 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -19,7 +19,7 @@ use super::{Between, Expr, Like};
 use crate::expr::{
     AggregateFunction, AggregateFunctionDefinition, Alias, BinaryExpr, Cast,
     GetFieldAccess, GetIndexedField, InList, InSubquery, Placeholder, 
ScalarFunction,
-    ScalarFunctionDefinition, Sort, TryCast, WindowFunction,
+    ScalarFunctionDefinition, Sort, TryCast, Unnest, WindowFunction,
 };
 use crate::field_util::GetFieldAccessSchema;
 use crate::type_coercion::binary::get_result_type;
@@ -82,6 +82,13 @@ impl ExprSchemable for Expr {
             Expr::Case(case) => case.when_then_expr[0].1.get_type(schema),
             Expr::Cast(Cast { data_type, .. })
             | Expr::TryCast(TryCast { data_type, .. }) => 
Ok(data_type.clone()),
+            Expr::Unnest(Unnest { exprs }) => {
+                let arg_data_types = exprs
+                    .iter()
+                    .map(|e| e.get_type(schema))
+                    .collect::<Result<Vec<_>>>()?;
+                Ok(arg_data_types[0].clone())
+            }
             Expr::ScalarFunction(ScalarFunction { func_def, args }) => {
                 let arg_data_types = args
                     .iter()
@@ -250,6 +257,7 @@ impl ExprSchemable for Expr {
             | Expr::ScalarFunction(..)
             | Expr::WindowFunction { .. }
             | Expr::AggregateFunction { .. }
+            | Expr::Unnest(_)
             | Expr::Placeholder(_) => Ok(true),
             Expr::IsNull(_)
             | Expr::IsNotNull(_)
diff --git a/datafusion/expr/src/tree_node/expr.rs 
b/datafusion/expr/src/tree_node/expr.rs
index 8b38d1cf01..add15b3d7a 100644
--- a/datafusion/expr/src/tree_node/expr.rs
+++ b/datafusion/expr/src/tree_node/expr.rs
@@ -20,7 +20,7 @@
 use crate::expr::{
     AggregateFunction, AggregateFunctionDefinition, Alias, Between, 
BinaryExpr, Case,
     Cast, GetIndexedField, GroupingSet, InList, InSubquery, Like, Placeholder,
-    ScalarFunction, ScalarFunctionDefinition, Sort, TryCast, WindowFunction,
+    ScalarFunction, ScalarFunctionDefinition, Sort, TryCast, Unnest, 
WindowFunction,
 };
 use crate::{Expr, GetFieldAccess};
 
@@ -33,7 +33,7 @@ impl TreeNode for Expr {
         op: &mut F,
     ) -> Result<VisitRecursion> {
         let children = match self {
-            Expr::Alias(Alias{expr, .. })
+            Expr::Alias(Alias{expr,..})
             | Expr::Not(expr)
             | Expr::IsNotNull(expr)
             | Expr::IsTrue(expr)
@@ -58,6 +58,7 @@ impl TreeNode for Expr {
                     GetFieldAccess::NamedStructField { .. } => vec![expr],
                 }
             }
+            Expr::Unnest(Unnest { exprs }) |
             Expr::GroupingSet(GroupingSet::Rollup(exprs))
             | Expr::GroupingSet(GroupingSet::Cube(exprs)) => 
exprs.iter().collect(),
             Expr::ScalarFunction (ScalarFunction{ args, .. } )  => {
@@ -151,6 +152,7 @@ impl TreeNode for Expr {
             | Expr::Exists { .. }
             | Expr::ScalarSubquery(_)
             | Expr::ScalarVariable(_, _)
+            | Expr::Unnest(_)
             | Expr::Literal(_) => self,
             Expr::Alias(Alias {
                 expr,
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 5d011e097f..e855554f36 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -270,7 +270,8 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut 
HashSet<Column>) -> Result<()> {
             // Use explicit pattern match instead of a default
             // implementation, so that in the future if someone adds
             // new Expr types, they will check here as well
-            Expr::ScalarVariable(_, _)
+            Expr::Unnest(_)
+            | Expr::ScalarVariable(_, _)
             | Expr::Alias(_)
             | Expr::Literal(_)
             | Expr::BinaryExpr { .. }
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index d804edb0c5..662e0fc7c2 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -134,6 +134,9 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
 
     fn mutate(&mut self, expr: Expr) -> Result<Expr> {
         match expr {
+            Expr::Unnest(_) => internal_err!(
+                "Unnest should be rewritten to LogicalPlan::Unnest before type 
coercion"
+            ),
             Expr::ScalarSubquery(Subquery {
                 subquery,
                 outer_ref_columns,
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index fc56cbb868..acdda68332 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -227,6 +227,7 @@ fn can_evaluate_as_join_condition(predicate: &Expr) -> 
Result<bool> {
         | Expr::InSubquery(_)
         | Expr::ScalarSubquery(_)
         | Expr::OuterReferenceColumn(_, _)
+        | Expr::Unnest(_)
         | Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction {
             func_def: ScalarFunctionDefinition::UDF(_),
             ..
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs 
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index 05a4f18e63..fe36790994 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -483,6 +483,7 @@ impl<'a> ConstEvaluator<'a> {
                 ScalarFunctionDefinition::Name(_) => false,
             },
             Expr::Literal(_)
+            | Expr::Unnest(_)
             | Expr::BinaryExpr { .. }
             | Expr::Not(_)
             | Expr::IsNotNull(_)
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index f2b5c5dd42..2183996948 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -426,6 +426,8 @@ message LogicalExprNode {
 
     PlaceholderNode placeholder = 34;
 
+    Unnest unnest = 35;
+
   }
 }
 
@@ -531,6 +533,10 @@ message NegativeNode {
   LogicalExprNode expr = 1;
 }
 
+message Unnest {
+  repeated LogicalExprNode exprs = 1;
+}
+
 message InListNode {
   LogicalExprNode expr = 1;
   repeated LogicalExprNode list = 2;
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index b9a8c5fc07..450b18dc09 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -13359,6 +13359,9 @@ impl serde::Serialize for LogicalExprNode {
                 logical_expr_node::ExprType::Placeholder(v) => {
                     struct_ser.serialize_field("placeholder", v)?;
                 }
+                logical_expr_node::ExprType::Unnest(v) => {
+                    struct_ser.serialize_field("unnest", v)?;
+                }
             }
         }
         struct_ser.end()
@@ -13426,6 +13429,7 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
             "similar_to",
             "similarTo",
             "placeholder",
+            "unnest",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -13464,6 +13468,7 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
             Ilike,
             SimilarTo,
             Placeholder,
+            Unnest,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -13519,6 +13524,7 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
                             "ilike" => Ok(GeneratedField::Ilike),
                             "similarTo" | "similar_to" => 
Ok(GeneratedField::SimilarTo),
                             "placeholder" => Ok(GeneratedField::Placeholder),
+                            "unnest" => Ok(GeneratedField::Unnest),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -13777,6 +13783,13 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
                                 return 
Err(serde::de::Error::duplicate_field("placeholder"));
                             }
                             expr_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(logical_expr_node::ExprType::Placeholder)
+;
+                        }
+                        GeneratedField::Unnest => {
+                            if expr_type__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("unnest"));
+                            }
+                            expr_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(logical_expr_node::ExprType::Unnest)
 ;
                         }
                     }
@@ -26752,6 +26765,97 @@ impl<'de> serde::Deserialize<'de> for UniqueConstraint 
{
         deserializer.deserialize_struct("datafusion.UniqueConstraint", FIELDS, 
GeneratedVisitor)
     }
 }
+impl serde::Serialize for Unnest {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if !self.exprs.is_empty() {
+            len += 1;
+        }
+        let mut struct_ser = serializer.serialize_struct("datafusion.Unnest", 
len)?;
+        if !self.exprs.is_empty() {
+            struct_ser.serialize_field("exprs", &self.exprs)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for Unnest {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "exprs",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            Exprs,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "exprs" => Ok(GeneratedField::Exprs),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = Unnest;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.Unnest")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> std::result::Result<Unnest, 
V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut exprs__ = None;
+                while let Some(k) = map_.next_key()? {
+                    match k {
+                        GeneratedField::Exprs => {
+                            if exprs__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("exprs"));
+                            }
+                            exprs__ = Some(map_.next_value()?);
+                        }
+                    }
+                }
+                Ok(Unnest {
+                    exprs: exprs__.unwrap_or_default(),
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.Unnest", FIELDS, 
GeneratedVisitor)
+    }
+}
 impl serde::Serialize for ValuesNode {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 758ef2dcb5..7894285129 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -585,7 +585,7 @@ pub struct SubqueryAliasNode {
 pub struct LogicalExprNode {
     #[prost(
         oneof = "logical_expr_node::ExprType",
-        tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34"
+        tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35"
     )]
     pub expr_type: ::core::option::Option<logical_expr_node::ExprType>,
 }
@@ -670,6 +670,8 @@ pub mod logical_expr_node {
         SimilarTo(::prost::alloc::boxed::Box<super::SimilarToNode>),
         #[prost(message, tag = "34")]
         Placeholder(super::PlaceholderNode),
+        #[prost(message, tag = "35")]
+        Unnest(super::Unnest),
     }
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
@@ -836,6 +838,12 @@ pub struct NegativeNode {
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct Unnest {
+    #[prost(message, repeated, tag = "1")]
+    pub exprs: ::prost::alloc::vec::Vec<LogicalExprNode>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct InListNode {
     #[prost(message, optional, boxed, tag = "1")]
     pub expr: 
::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index decf3b1874..8ef7271ff2 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -44,6 +44,7 @@ use datafusion_common::{
     Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, 
OwnedTableReference,
     Result, ScalarValue,
 };
+use datafusion_expr::expr::Unnest;
 use datafusion_expr::window_frame::{check_window_frame, 
regularize_window_order_by};
 use datafusion_expr::{
     abs, acos, acosh, array, array_append, array_concat, array_dims, 
array_distinct,
@@ -1339,6 +1340,14 @@ pub fn parse_expr(
         ExprType::Negative(negative) => Ok(Expr::Negative(Box::new(
             parse_required_expr(negative.expr.as_deref(), registry, "expr")?,
         ))),
+        ExprType::Unnest(unnest) => {
+            let exprs = unnest
+                .exprs
+                .iter()
+                .map(|e| parse_expr(e, registry))
+                .collect::<Result<Vec<_>, _>>()?;
+            Ok(Expr::Unnest(Unnest { exprs }))
+        }
         ExprType::InList(in_list) => Ok(Expr::InList(InList::new(
             Box::new(parse_required_expr(
                 in_list.expr.as_deref(),
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index e094994840..e5948de40a 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -48,7 +48,7 @@ use datafusion_common::{
 use datafusion_expr::expr::{
     self, AggregateFunctionDefinition, Alias, Between, BinaryExpr, Cast, 
GetFieldAccess,
     GetIndexedField, GroupingSet, InList, Like, Placeholder, ScalarFunction,
-    ScalarFunctionDefinition, Sort,
+    ScalarFunctionDefinition, Sort, Unnest,
 };
 use datafusion_expr::{
     logical_plan::PlanType, logical_plan::StringifiedPlan, AggregateFunction,
@@ -987,6 +987,18 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
                     expr_type: Some(ExprType::Negative(expr)),
                 }
             }
+            Expr::Unnest(Unnest { exprs }) => {
+                let expr = protobuf::Unnest {
+                    exprs: exprs.iter().map(|expr| 
expr.try_into()).collect::<Result<
+                        Vec<_>,
+                        Error,
+                    >>(
+                    )?,
+                };
+                Self {
+                    expr_type: Some(ExprType::Unnest(expr)),
+                }
+            }
             Expr::InList(InList {
                 expr,
                 list,
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 0db086419a..1bcdffe892 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -47,7 +47,7 @@ use datafusion_common::{FileType, Result};
 use datafusion_expr::dml::{CopyOptions, CopyTo};
 use datafusion_expr::expr::{
     self, Between, BinaryExpr, Case, Cast, GroupingSet, InList, Like, 
ScalarFunction,
-    Sort,
+    Sort, Unnest,
 };
 use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore};
 use datafusion_expr::{
@@ -1463,6 +1463,16 @@ fn roundtrip_inlist() {
     roundtrip_expr_test(test_expr, ctx);
 }
 
+#[test]
+fn roundtrip_unnest() {
+    let test_expr = Expr::Unnest(Unnest {
+        exprs: vec![lit(1), lit(2), lit(3)],
+    });
+
+    let ctx = SessionContext::new();
+    roundtrip_expr_test(test_expr, ctx);
+}
+
 #[test]
 fn roundtrip_wildcard() {
     let test_expr = Expr::Wildcard { qualifier: None };
diff --git a/datafusion/sql/src/expr/function.rs 
b/datafusion/sql/src/expr/function.rs
index 30f8605c39..3187f26dcc 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -17,15 +17,15 @@
 
 use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
 use datafusion_common::{
-    not_impl_err, plan_datafusion_err, plan_err, DFSchema, DataFusionError, 
Dependency,
-    Result,
+    exec_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema, 
DataFusionError,
+    Dependency, Result,
 };
-use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::expr::{ScalarFunction, Unnest};
 use datafusion_expr::function::suggest_valid_function;
 use datafusion_expr::window_frame::{check_window_frame, 
regularize_window_order_by};
 use datafusion_expr::{
-    expr, AggregateFunction, BuiltinScalarFunction, Expr, WindowFrame,
-    WindowFunctionDefinition,
+    expr, AggregateFunction, BuiltinScalarFunction, Expr, 
ScalarFunctionDefinition,
+    WindowFrame, WindowFunctionDefinition,
 };
 use sqlparser::ast::{
     Expr as SQLExpr, Function as SQLFunction, FunctionArg, FunctionArgExpr, 
WindowType,
@@ -70,6 +70,50 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             return Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fm, args)));
         }
 
+        // Build Unnest expression
+        if name.eq("unnest") {
+            let exprs =
+                self.function_args_to_expr(args.clone(), schema, 
planner_context)?;
+
+            match exprs.len() {
+                0 => {
+                    return exec_err!("unnest() requires at least one 
argument");
+                }
+                1 => {
+                    if let Expr::ScalarFunction(ScalarFunction {
+                        func_def:
+                            ScalarFunctionDefinition::BuiltIn(
+                                BuiltinScalarFunction::MakeArray,
+                            ),
+                        ..
+                    }) = exprs[0]
+                    {
+                        // valid
+                    } else if let Expr::Column(_) = exprs[0] {
+                        // valid
+                    } else if let Expr::ScalarFunction(ScalarFunction {
+                        func_def:
+                            
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::Struct),
+                        ..
+                    }) = exprs[0]
+                    {
+                        return not_impl_err!("unnest() does not support struct 
yet");
+                    } else {
+                        return plan_err!(
+                            "unnest() can only be applied to array and structs 
and null"
+                        );
+                    }
+                }
+                _ => {
+                    return not_impl_err!(
+                        "unnest() does not support multiple arguments yet"
+                    );
+                }
+            }
+
+            return Ok(Expr::Unnest(Unnest { exprs }));
+        }
+
         // next, scalar built-in
         if let Ok(fun) = BuiltinScalarFunction::from_str(&name) {
             let args = self.function_args_to_expr(args, schema, 
planner_context)?;
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index a0819e4aaf..7862715e5f 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -26,11 +26,10 @@ use crate::utils::{
 
 use datafusion_common::Column;
 use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
-use datafusion_expr::expr::Alias;
+use datafusion_expr::expr::{Alias, Unnest};
 use datafusion_expr::expr_rewriter::{
     normalize_col, normalize_col_with_schemas_and_ambiguity_check,
 };
-use datafusion_expr::logical_plan::builder::project;
 use datafusion_expr::utils::{
     expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, 
expr_to_columns,
     find_aggregate_exprs, find_window_exprs,
@@ -221,8 +220,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             plan
         };
 
-        // final projection
-        let plan = project(plan, select_exprs_post_aggr)?;
+        // try process unnest expression or do the final projection
+        let plan = self.try_process_unnest(plan, select_exprs_post_aggr)?;
 
         // process distinct clause
         let plan = match select.distinct {
@@ -275,6 +274,40 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         Ok(plan)
     }
 
+    // Try converting Expr::Unnest to LogicalPlan::Unnest if possible, 
otherwise do the final projection
+    fn try_process_unnest(
+        &self,
+        input: LogicalPlan,
+        select_exprs: Vec<Expr>,
+    ) -> Result<LogicalPlan> {
+        let mut exprs_to_unnest = vec![];
+
+        for expr in select_exprs.iter() {
+            if let Expr::Unnest(Unnest { exprs }) = expr {
+                exprs_to_unnest.push(exprs[0].clone());
+            }
+        }
+
+        // Do the final projection
+        if exprs_to_unnest.is_empty() {
+            LogicalPlanBuilder::from(input)
+                .project(select_exprs)?
+                .build()
+        } else {
+            if exprs_to_unnest.len() > 1 {
+                return not_impl_err!("Only support single unnest expression 
for now");
+            }
+
+            let expr = exprs_to_unnest[0].clone();
+            let column = expr.display_name()?;
+
+            LogicalPlanBuilder::from(input)
+                .project(vec![expr])?
+                .unnest_column(column)?
+                .build()
+        }
+    }
+
     fn plan_selection(
         &self,
         selection: Option<SQLExpr>,
diff --git a/datafusion/sqllogictest/test_files/unnest.slt 
b/datafusion/sqllogictest/test_files/unnest.slt
new file mode 100644
index 0000000000..7e4ce06be2
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/unnest.slt
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+############################
+# Unnest Expressions Tests #
+############################
+
+statement ok
+CREATE TABLE unnest_table
+AS VALUES
+    ([1,2,3], [7], 1),
+    ([4,5], [8,9,10], 2),
+    ([6], [11,12], 3),
+    ([12], [null, 42, null], null)
+;
+
+## Basic unnest expression
+query I
+select unnest([1,2,3]);
+----
+1
+2
+3
+
+query error DataFusion error: Error during planning: unnest\(\) can only be 
applied to array and structs and null
+select unnest(null);
+
+## Unnest empty array
+query ?
+select unnest([]);
+----
+
+## Unnest column non-null
+query I
+select unnest(column1) from unnest_table;
+----
+1
+2
+3
+4
+5
+6
+12
+
+## Unnest column with null
+query I
+select unnest(column2) from unnest_table;
+----
+7
+8
+9
+10
+11
+12
+NULL
+42
+NULL
+
+## Unnest column with scalars
+# TODO: This should be an error, but unnest is able to process scalar values 
now.
+query I
+select unnest(column3) from unnest_table;
+----
+1
+2
+3
+NULL
+
+## Unnest multiple columns
+query error DataFusion error: This feature is not implemented: Only support 
single unnest expression for now
+select unnest(column1), unnest(column2) from unnest_table;
+
+## Unnest scalar
+query error DataFusion error: Error during planning: unnest\(\) can only be 
applied to array and structs and null
+select unnest(1);
+
+
+## Unnest empty expression
+query error DataFusion error: Execution error: unnest\(\) requires at least 
one argument
+select unnest();
+
+statement ok
+drop table unnest_table;

Reply via email to