This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 24197d706e Unnest with single expression (#9069)
24197d706e is described below
commit 24197d706e5d899973f77639aa07058fc51fd33f
Author: Jay Zhan <[email protected]>
AuthorDate: Sun Feb 4 21:24:42 2024 +0800
Unnest with single expression (#9069)
* introduce basic unnest
Signed-off-by: jayzhan211 <[email protected]>
* proto
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* rename
Signed-off-by: jayzhan211 <[email protected]>
* change panic to internal error
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* typo
Signed-off-by: jayzhan211 <[email protected]>
* more err handling
Signed-off-by: jayzhan211 <[email protected]>
* fix reimported
Signed-off-by: jayzhan211 <[email protected]>
* comments and fix typo
Signed-off-by: jayzhan211 <[email protected]>
* Add tests for nulls
---------
Signed-off-by: jayzhan211 <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/datasource/listing/helpers.rs | 1 +
datafusion/core/src/physical_planner.rs | 5 +
datafusion/expr/src/expr.rs | 13 +++
datafusion/expr/src/expr_rewriter/mod.rs | 12 ++-
datafusion/expr/src/expr_schema.rs | 10 +-
datafusion/expr/src/tree_node/expr.rs | 6 +-
datafusion/expr/src/utils.rs | 3 +-
datafusion/optimizer/src/analyzer/type_coercion.rs | 3 +
datafusion/optimizer/src/push_down_filter.rs | 1 +
.../src/simplify_expressions/expr_simplifier.rs | 1 +
datafusion/proto/proto/datafusion.proto | 6 ++
datafusion/proto/src/generated/pbjson.rs | 104 +++++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 10 +-
datafusion/proto/src/logical_plan/from_proto.rs | 9 ++
datafusion/proto/src/logical_plan/to_proto.rs | 14 ++-
.../proto/tests/cases/roundtrip_logical_plan.rs | 12 ++-
datafusion/sql/src/expr/function.rs | 54 ++++++++++-
datafusion/sql/src/select.rs | 41 +++++++-
datafusion/sqllogictest/test_files/unnest.slt | 97 +++++++++++++++++++
19 files changed, 385 insertions(+), 17 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index a03bcec7ab..077356b716 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -126,6 +126,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr:
&Expr) -> bool {
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
+ | Expr::Unnest { .. }
| Expr::Placeholder(_) => {
is_applicable = false;
Ok(VisitRecursion::Stop)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 6aa0c93fad..591a3e3131 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -124,6 +124,11 @@ fn physical_name(e: &Expr) -> Result<String> {
fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
match e {
+ Expr::Unnest(_) => {
+ internal_err!(
+ "Expr::Unnest should have been converted to
LogicalPlan::Unnest"
+ )
+ }
Expr::Column(c) => {
if is_first_expr {
Ok(c.name.clone())
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 0000f3df03..09de4b708d 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -180,6 +180,13 @@ pub enum Expr {
/// A place holder which hold a reference to a qualified field
/// in the outer query, used for correlated sub queries.
OuterReferenceColumn(DataType, Column),
+ /// Unnest expression
+ Unnest(Unnest),
+}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct Unnest {
+ pub exprs: Vec<Expr>,
}
/// Alias expression
@@ -917,6 +924,7 @@ impl Expr {
Expr::TryCast { .. } => "TryCast",
Expr::WindowFunction { .. } => "WindowFunction",
Expr::Wildcard { .. } => "Wildcard",
+ Expr::Unnest { .. } => "Unnest",
}
}
@@ -1307,6 +1315,7 @@ impl Expr {
| Expr::Negative(..)
| Expr::OuterReferenceColumn(_, _)
| Expr::TryCast(..)
+ | Expr::Unnest(..)
| Expr::Wildcard { .. }
| Expr::WindowFunction(..)
| Expr::Literal(..)
@@ -1561,6 +1570,9 @@ impl fmt::Display for Expr {
}
},
Expr::Placeholder(Placeholder { id, .. }) => write!(f, "{id}"),
+ Expr::Unnest(Unnest { exprs }) => {
+ write!(f, "UNNEST({exprs:?})")
+ }
}
}
}
@@ -1748,6 +1760,7 @@ fn create_name(e: &Expr) -> Result<String> {
}
}
}
+ Expr::Unnest(Unnest { exprs }) => Ok(format!("UNNEST({exprs:?})")),
Expr::ScalarFunction(fun) => create_function_name(fun.name(), false,
&fun.args),
Expr::WindowFunction(WindowFunction {
fun,
diff --git a/datafusion/expr/src/expr_rewriter/mod.rs
b/datafusion/expr/src/expr_rewriter/mod.rs
index 1f04c80833..3f7388c3c3 100644
--- a/datafusion/expr/src/expr_rewriter/mod.rs
+++ b/datafusion/expr/src/expr_rewriter/mod.rs
@@ -17,7 +17,7 @@
//! Expression rewriter
-use crate::expr::Alias;
+use crate::expr::{Alias, Unnest};
use crate::logical_plan::Projection;
use crate::{Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
@@ -75,6 +75,16 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
schemas: &[&[&DFSchema]],
using_columns: &[HashSet<Column>],
) -> Result<Expr> {
+ // Normalize column inside Unnest
+ if let Expr::Unnest(Unnest { exprs }) = expr {
+ let e = normalize_col_with_schemas_and_ambiguity_check(
+ exprs[0].clone(),
+ schemas,
+ using_columns,
+ )?;
+ return Ok(Expr::Unnest(Unnest { exprs: vec![e] }));
+ }
+
expr.transform(&|expr| {
Ok({
if let Expr::Column(c) = expr {
diff --git a/datafusion/expr/src/expr_schema.rs
b/datafusion/expr/src/expr_schema.rs
index 865279ab9a..d30f304a26 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -19,7 +19,7 @@ use super::{Between, Expr, Like};
use crate::expr::{
AggregateFunction, AggregateFunctionDefinition, Alias, BinaryExpr, Cast,
GetFieldAccess, GetIndexedField, InList, InSubquery, Placeholder,
ScalarFunction,
- ScalarFunctionDefinition, Sort, TryCast, WindowFunction,
+ ScalarFunctionDefinition, Sort, TryCast, Unnest, WindowFunction,
};
use crate::field_util::GetFieldAccessSchema;
use crate::type_coercion::binary::get_result_type;
@@ -82,6 +82,13 @@ impl ExprSchemable for Expr {
Expr::Case(case) => case.when_then_expr[0].1.get_type(schema),
Expr::Cast(Cast { data_type, .. })
| Expr::TryCast(TryCast { data_type, .. }) =>
Ok(data_type.clone()),
+ Expr::Unnest(Unnest { exprs }) => {
+ let arg_data_types = exprs
+ .iter()
+ .map(|e| e.get_type(schema))
+ .collect::<Result<Vec<_>>>()?;
+ Ok(arg_data_types[0].clone())
+ }
Expr::ScalarFunction(ScalarFunction { func_def, args }) => {
let arg_data_types = args
.iter()
@@ -250,6 +257,7 @@ impl ExprSchemable for Expr {
| Expr::ScalarFunction(..)
| Expr::WindowFunction { .. }
| Expr::AggregateFunction { .. }
+ | Expr::Unnest(_)
| Expr::Placeholder(_) => Ok(true),
Expr::IsNull(_)
| Expr::IsNotNull(_)
diff --git a/datafusion/expr/src/tree_node/expr.rs
b/datafusion/expr/src/tree_node/expr.rs
index 8b38d1cf01..add15b3d7a 100644
--- a/datafusion/expr/src/tree_node/expr.rs
+++ b/datafusion/expr/src/tree_node/expr.rs
@@ -20,7 +20,7 @@
use crate::expr::{
AggregateFunction, AggregateFunctionDefinition, Alias, Between,
BinaryExpr, Case,
Cast, GetIndexedField, GroupingSet, InList, InSubquery, Like, Placeholder,
- ScalarFunction, ScalarFunctionDefinition, Sort, TryCast, WindowFunction,
+ ScalarFunction, ScalarFunctionDefinition, Sort, TryCast, Unnest,
WindowFunction,
};
use crate::{Expr, GetFieldAccess};
@@ -33,7 +33,7 @@ impl TreeNode for Expr {
op: &mut F,
) -> Result<VisitRecursion> {
let children = match self {
- Expr::Alias(Alias{expr, .. })
+ Expr::Alias(Alias{expr,..})
| Expr::Not(expr)
| Expr::IsNotNull(expr)
| Expr::IsTrue(expr)
@@ -58,6 +58,7 @@ impl TreeNode for Expr {
GetFieldAccess::NamedStructField { .. } => vec![expr],
}
}
+ Expr::Unnest(Unnest { exprs }) |
Expr::GroupingSet(GroupingSet::Rollup(exprs))
| Expr::GroupingSet(GroupingSet::Cube(exprs)) =>
exprs.iter().collect(),
Expr::ScalarFunction (ScalarFunction{ args, .. } ) => {
@@ -151,6 +152,7 @@ impl TreeNode for Expr {
| Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::ScalarVariable(_, _)
+ | Expr::Unnest(_)
| Expr::Literal(_) => self,
Expr::Alias(Alias {
expr,
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 5d011e097f..e855554f36 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -270,7 +270,8 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut
HashSet<Column>) -> Result<()> {
// Use explicit pattern match instead of a default
// implementation, so that in the future if someone adds
// new Expr types, they will check here as well
- Expr::ScalarVariable(_, _)
+ Expr::Unnest(_)
+ | Expr::ScalarVariable(_, _)
| Expr::Alias(_)
| Expr::Literal(_)
| Expr::BinaryExpr { .. }
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index d804edb0c5..662e0fc7c2 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -134,6 +134,9 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
+ Expr::Unnest(_) => internal_err!(
+ "Unnest should be rewritten to LogicalPlan::Unnest before type
coercion"
+ ),
Expr::ScalarSubquery(Subquery {
subquery,
outer_ref_columns,
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index fc56cbb868..acdda68332 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -227,6 +227,7 @@ fn can_evaluate_as_join_condition(predicate: &Expr) ->
Result<bool> {
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::OuterReferenceColumn(_, _)
+ | Expr::Unnest(_)
| Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(_),
..
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index 05a4f18e63..fe36790994 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -483,6 +483,7 @@ impl<'a> ConstEvaluator<'a> {
ScalarFunctionDefinition::Name(_) => false,
},
Expr::Literal(_)
+ | Expr::Unnest(_)
| Expr::BinaryExpr { .. }
| Expr::Not(_)
| Expr::IsNotNull(_)
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index f2b5c5dd42..2183996948 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -426,6 +426,8 @@ message LogicalExprNode {
PlaceholderNode placeholder = 34;
+ Unnest unnest = 35;
+
}
}
@@ -531,6 +533,10 @@ message NegativeNode {
LogicalExprNode expr = 1;
}
+message Unnest {
+ repeated LogicalExprNode exprs = 1;
+}
+
message InListNode {
LogicalExprNode expr = 1;
repeated LogicalExprNode list = 2;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index b9a8c5fc07..450b18dc09 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -13359,6 +13359,9 @@ impl serde::Serialize for LogicalExprNode {
logical_expr_node::ExprType::Placeholder(v) => {
struct_ser.serialize_field("placeholder", v)?;
}
+ logical_expr_node::ExprType::Unnest(v) => {
+ struct_ser.serialize_field("unnest", v)?;
+ }
}
}
struct_ser.end()
@@ -13426,6 +13429,7 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
"similar_to",
"similarTo",
"placeholder",
+ "unnest",
];
#[allow(clippy::enum_variant_names)]
@@ -13464,6 +13468,7 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
Ilike,
SimilarTo,
Placeholder,
+ Unnest,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -13519,6 +13524,7 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
"ilike" => Ok(GeneratedField::Ilike),
"similarTo" | "similar_to" =>
Ok(GeneratedField::SimilarTo),
"placeholder" => Ok(GeneratedField::Placeholder),
+ "unnest" => Ok(GeneratedField::Unnest),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -13777,6 +13783,13 @@ impl<'de> serde::Deserialize<'de> for LogicalExprNode {
return
Err(serde::de::Error::duplicate_field("placeholder"));
}
expr_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(logical_expr_node::ExprType::Placeholder)
+;
+ }
+ GeneratedField::Unnest => {
+ if expr_type__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("unnest"));
+ }
+ expr_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(logical_expr_node::ExprType::Unnest)
;
}
}
@@ -26752,6 +26765,97 @@ impl<'de> serde::Deserialize<'de> for UniqueConstraint
{
deserializer.deserialize_struct("datafusion.UniqueConstraint", FIELDS,
GeneratedVisitor)
}
}
+impl serde::Serialize for Unnest {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if !self.exprs.is_empty() {
+ len += 1;
+ }
+ let mut struct_ser = serializer.serialize_struct("datafusion.Unnest",
len)?;
+ if !self.exprs.is_empty() {
+ struct_ser.serialize_field("exprs", &self.exprs)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for Unnest {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "exprs",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ Exprs,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "exprs" => Ok(GeneratedField::Exprs),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = Unnest;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.Unnest")
+ }
+
+ fn visit_map<V>(self, mut map_: V) -> std::result::Result<Unnest,
V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut exprs__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::Exprs => {
+ if exprs__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("exprs"));
+ }
+ exprs__ = Some(map_.next_value()?);
+ }
+ }
+ }
+ Ok(Unnest {
+ exprs: exprs__.unwrap_or_default(),
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.Unnest", FIELDS,
GeneratedVisitor)
+ }
+}
impl serde::Serialize for ValuesNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 758ef2dcb5..7894285129 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -585,7 +585,7 @@ pub struct SubqueryAliasNode {
pub struct LogicalExprNode {
#[prost(
oneof = "logical_expr_node::ExprType",
- tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34"
+ tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35"
)]
pub expr_type: ::core::option::Option<logical_expr_node::ExprType>,
}
@@ -670,6 +670,8 @@ pub mod logical_expr_node {
SimilarTo(::prost::alloc::boxed::Box<super::SimilarToNode>),
#[prost(message, tag = "34")]
Placeholder(super::PlaceholderNode),
+ #[prost(message, tag = "35")]
+ Unnest(super::Unnest),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -836,6 +838,12 @@ pub struct NegativeNode {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct Unnest {
+ #[prost(message, repeated, tag = "1")]
+ pub exprs: ::prost::alloc::vec::Vec<LogicalExprNode>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct InListNode {
#[prost(message, optional, boxed, tag = "1")]
pub expr:
::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index decf3b1874..8ef7271ff2 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -44,6 +44,7 @@ use datafusion_common::{
Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError,
OwnedTableReference,
Result, ScalarValue,
};
+use datafusion_expr::expr::Unnest;
use datafusion_expr::window_frame::{check_window_frame,
regularize_window_order_by};
use datafusion_expr::{
abs, acos, acosh, array, array_append, array_concat, array_dims,
array_distinct,
@@ -1339,6 +1340,14 @@ pub fn parse_expr(
ExprType::Negative(negative) => Ok(Expr::Negative(Box::new(
parse_required_expr(negative.expr.as_deref(), registry, "expr")?,
))),
+ ExprType::Unnest(unnest) => {
+ let exprs = unnest
+ .exprs
+ .iter()
+ .map(|e| parse_expr(e, registry))
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(Expr::Unnest(Unnest { exprs }))
+ }
ExprType::InList(in_list) => Ok(Expr::InList(InList::new(
Box::new(parse_required_expr(
in_list.expr.as_deref(),
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index e094994840..e5948de40a 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -48,7 +48,7 @@ use datafusion_common::{
use datafusion_expr::expr::{
self, AggregateFunctionDefinition, Alias, Between, BinaryExpr, Cast,
GetFieldAccess,
GetIndexedField, GroupingSet, InList, Like, Placeholder, ScalarFunction,
- ScalarFunctionDefinition, Sort,
+ ScalarFunctionDefinition, Sort, Unnest,
};
use datafusion_expr::{
logical_plan::PlanType, logical_plan::StringifiedPlan, AggregateFunction,
@@ -987,6 +987,18 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
expr_type: Some(ExprType::Negative(expr)),
}
}
+ Expr::Unnest(Unnest { exprs }) => {
+ let expr = protobuf::Unnest {
+ exprs: exprs.iter().map(|expr|
expr.try_into()).collect::<Result<
+ Vec<_>,
+ Error,
+ >>(
+ )?,
+ };
+ Self {
+ expr_type: Some(ExprType::Unnest(expr)),
+ }
+ }
Expr::InList(InList {
expr,
list,
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 0db086419a..1bcdffe892 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -47,7 +47,7 @@ use datafusion_common::{FileType, Result};
use datafusion_expr::dml::{CopyOptions, CopyTo};
use datafusion_expr::expr::{
self, Between, BinaryExpr, Case, Cast, GroupingSet, InList, Like,
ScalarFunction,
- Sort,
+ Sort, Unnest,
};
use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore};
use datafusion_expr::{
@@ -1463,6 +1463,16 @@ fn roundtrip_inlist() {
roundtrip_expr_test(test_expr, ctx);
}
+#[test]
+fn roundtrip_unnest() {
+ let test_expr = Expr::Unnest(Unnest {
+ exprs: vec![lit(1), lit(2), lit(3)],
+ });
+
+ let ctx = SessionContext::new();
+ roundtrip_expr_test(test_expr, ctx);
+}
+
#[test]
fn roundtrip_wildcard() {
let test_expr = Expr::Wildcard { qualifier: None };
diff --git a/datafusion/sql/src/expr/function.rs
b/datafusion/sql/src/expr/function.rs
index 30f8605c39..3187f26dcc 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -17,15 +17,15 @@
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{
- not_impl_err, plan_datafusion_err, plan_err, DFSchema, DataFusionError,
Dependency,
- Result,
+ exec_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema,
DataFusionError,
+ Dependency, Result,
};
-use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::expr::{ScalarFunction, Unnest};
use datafusion_expr::function::suggest_valid_function;
use datafusion_expr::window_frame::{check_window_frame,
regularize_window_order_by};
use datafusion_expr::{
- expr, AggregateFunction, BuiltinScalarFunction, Expr, WindowFrame,
- WindowFunctionDefinition,
+ expr, AggregateFunction, BuiltinScalarFunction, Expr,
ScalarFunctionDefinition,
+ WindowFrame, WindowFunctionDefinition,
};
use sqlparser::ast::{
Expr as SQLExpr, Function as SQLFunction, FunctionArg, FunctionArgExpr,
WindowType,
@@ -70,6 +70,50 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
return Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fm, args)));
}
+ // Build Unnest expression
+ if name.eq("unnest") {
+ let exprs =
+ self.function_args_to_expr(args.clone(), schema,
planner_context)?;
+
+ match exprs.len() {
+ 0 => {
+ return exec_err!("unnest() requires at least one
argument");
+ }
+ 1 => {
+ if let Expr::ScalarFunction(ScalarFunction {
+ func_def:
+ ScalarFunctionDefinition::BuiltIn(
+ BuiltinScalarFunction::MakeArray,
+ ),
+ ..
+ }) = exprs[0]
+ {
+ // valid
+ } else if let Expr::Column(_) = exprs[0] {
+ // valid
+ } else if let Expr::ScalarFunction(ScalarFunction {
+ func_def:
+
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::Struct),
+ ..
+ }) = exprs[0]
+ {
+ return not_impl_err!("unnest() does not support struct
yet");
+ } else {
+ return plan_err!(
+ "unnest() can only be applied to array and structs
and null"
+ );
+ }
+ }
+ _ => {
+ return not_impl_err!(
+ "unnest() does not support multiple arguments yet"
+ );
+ }
+ }
+
+ return Ok(Expr::Unnest(Unnest { exprs }));
+ }
+
// next, scalar built-in
if let Ok(fun) = BuiltinScalarFunction::from_str(&name) {
let args = self.function_args_to_expr(args, schema,
planner_context)?;
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index a0819e4aaf..7862715e5f 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -26,11 +26,10 @@ use crate::utils::{
use datafusion_common::Column;
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
-use datafusion_expr::expr::Alias;
+use datafusion_expr::expr::{Alias, Unnest};
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check,
};
-use datafusion_expr::logical_plan::builder::project;
use datafusion_expr::utils::{
expand_qualified_wildcard, expand_wildcard, expr_as_column_expr,
expr_to_columns,
find_aggregate_exprs, find_window_exprs,
@@ -221,8 +220,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan
};
- // final projection
- let plan = project(plan, select_exprs_post_aggr)?;
+ // try process unnest expression or do the final projection
+ let plan = self.try_process_unnest(plan, select_exprs_post_aggr)?;
// process distinct clause
let plan = match select.distinct {
@@ -275,6 +274,40 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Ok(plan)
}
+ // Try converting Expr::Unnest to LogicalPlan::Unnest if possible,
otherwise do the final projection
+ fn try_process_unnest(
+ &self,
+ input: LogicalPlan,
+ select_exprs: Vec<Expr>,
+ ) -> Result<LogicalPlan> {
+ let mut exprs_to_unnest = vec![];
+
+ for expr in select_exprs.iter() {
+ if let Expr::Unnest(Unnest { exprs }) = expr {
+ exprs_to_unnest.push(exprs[0].clone());
+ }
+ }
+
+ // Do the final projection
+ if exprs_to_unnest.is_empty() {
+ LogicalPlanBuilder::from(input)
+ .project(select_exprs)?
+ .build()
+ } else {
+ if exprs_to_unnest.len() > 1 {
+ return not_impl_err!("Only support single unnest expression
for now");
+ }
+
+ let expr = exprs_to_unnest[0].clone();
+ let column = expr.display_name()?;
+
+ LogicalPlanBuilder::from(input)
+ .project(vec![expr])?
+ .unnest_column(column)?
+ .build()
+ }
+ }
+
fn plan_selection(
&self,
selection: Option<SQLExpr>,
diff --git a/datafusion/sqllogictest/test_files/unnest.slt
b/datafusion/sqllogictest/test_files/unnest.slt
new file mode 100644
index 0000000000..7e4ce06be2
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/unnest.slt
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+############################
+# Unnest Expressions Tests #
+############################
+
+statement ok
+CREATE TABLE unnest_table
+AS VALUES
+ ([1,2,3], [7], 1),
+ ([4,5], [8,9,10], 2),
+ ([6], [11,12], 3),
+ ([12], [null, 42, null], null)
+;
+
+## Basic unnest expression
+query I
+select unnest([1,2,3]);
+----
+1
+2
+3
+
+query error DataFusion error: Error during planning: unnest\(\) can only be
applied to array and structs and null
+select unnest(null);
+
+## Unnest empty array
+query ?
+select unnest([]);
+----
+
+## Unnest column non-null
+query I
+select unnest(column1) from unnest_table;
+----
+1
+2
+3
+4
+5
+6
+12
+
+## Unnest column with null
+query I
+select unnest(column2) from unnest_table;
+----
+7
+8
+9
+10
+11
+12
+NULL
+42
+NULL
+
+## Unnest column with scalars
+# TODO: This should be an error, but unnest is able to process scalar values
now.
+query I
+select unnest(column3) from unnest_table;
+----
+1
+2
+3
+NULL
+
+## Unnest multiple columns
+query error DataFusion error: This feature is not implemented: Only support
single unnest expression for now
+select unnest(column1), unnest(column2) from unnest_table;
+
+## Unnest scalar
+query error DataFusion error: Error during planning: unnest\(\) can only be
applied to array and structs and null
+select unnest(1);
+
+
+## Unnest empty expression
+query error DataFusion error: Execution error: unnest\(\) requires at least
one argument
+select unnest();
+
+statement ok
+drop table unnest_table;