This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 88187d4dcc move make_array array_append array_prepend array_concat
function to datafusion-functions-array crate (#9504)
88187d4dcc is described below
commit 88187d4dcc060d4d8ae7c680594e72fdfbc24c6a
Author: junxiangMu <[email protected]>
AuthorDate: Sun Mar 10 21:16:55 2024 +0800
move make_array array_append array_prepend array_concat function to
datafusion-functions-array crate (#9504)
* move array function
* fix rebase
Signed-off-by: jayzhan211 <[email protected]>
* cleanup to trigger rerun
Signed-off-by: jayzhan211 <[email protected]>
* split functions to different files
Signed-off-by: jayzhan211 <[email protected]>
* fix
Signed-off-by: jayzhan211 <[email protected]>
* fix conflict
Signed-off-by: jayzhan211 <[email protected]>
* clippy
Signed-off-by: jayzhan211 <[email protected]>
---------
Signed-off-by: jayzhan211 <[email protected]>
Co-authored-by: jayzhan211 <[email protected]>
---
datafusion-cli/Cargo.lock | 3 +
datafusion/expr/src/built_in_function.rs | 109 ------
datafusion/expr/src/expr_fn.rs | 23 --
datafusion/functions-array/Cargo.toml | 3 +
datafusion/functions-array/src/concat.rs | 436 +++++++++++++++++++++
datafusion/functions-array/src/kernels.rs | 4 +-
datafusion/functions-array/src/lib.rs | 10 +
datafusion/functions-array/src/macros.rs | 30 ++
datafusion/functions-array/src/make_array.rs | 221 +++++++++++
datafusion/functions-array/src/udf.rs | 18 +-
datafusion/functions-array/src/utils.rs | 129 +++++-
datafusion/optimizer/src/analyzer/rewrite_expr.rs | 159 +++-----
datafusion/optimizer/src/analyzer/type_coercion.rs | 29 +-
datafusion/physical-expr/src/array_expressions.rs | 306 +--------------
datafusion/physical-expr/src/functions.rs | 12 -
datafusion/physical-expr/src/scalar_function.rs | 7 +-
datafusion/proto/proto/datafusion.proto | 8 +-
datafusion/proto/src/generated/pbjson.rs | 12 -
datafusion/proto/src/generated/prost.rs | 16 +-
datafusion/proto/src/logical_plan/from_proto.rs | 39 +-
datafusion/proto/src/logical_plan/to_proto.rs | 4 -
.../proto/tests/cases/roundtrip_logical_plan.rs | 31 +-
datafusion/sql/src/expr/value.rs | 15 +-
23 files changed, 961 insertions(+), 663 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index b4af789682..dd2edb9b96 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1265,6 +1265,9 @@ name = "datafusion-functions-array"
version = "36.0.0"
dependencies = [
"arrow",
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-schema",
"datafusion-common",
"datafusion-execution",
"datafusion-expr",
diff --git a/datafusion/expr/src/built_in_function.rs
b/datafusion/expr/src/built_in_function.rs
index 991963a1bc..6351e877df 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -17,14 +17,12 @@
//! Built-in functions module contains all the built-in functions definitions.
-use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};
use crate::signature::TIMEZONE_WILDCARD;
-use crate::type_coercion::binary::get_wider_type;
use crate::type_coercion::functions::data_types;
use crate::{FuncMonotonicity, Signature, TypeSignature, Volatility};
@@ -112,12 +110,8 @@ pub enum BuiltinScalarFunction {
Cot,
// array functions
- /// array_append
- ArrayAppend,
/// array_sort
ArraySort,
- /// array_concat
- ArrayConcat,
/// array_pop_front
ArrayPopFront,
/// array_pop_back
@@ -130,8 +124,6 @@ pub enum BuiltinScalarFunction {
ArrayPosition,
/// array_positions
ArrayPositions,
- /// array_prepend
- ArrayPrepend,
/// array_remove
ArrayRemove,
/// array_remove_n
@@ -158,8 +150,6 @@ pub enum BuiltinScalarFunction {
ArrayExcept,
/// array_resize
ArrayResize,
- /// construct an array from columns
- MakeArray,
// struct functions
/// struct
@@ -345,9 +335,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Tan => Volatility::Immutable,
BuiltinScalarFunction::Tanh => Volatility::Immutable,
BuiltinScalarFunction::Trunc => Volatility::Immutable,
- BuiltinScalarFunction::ArrayAppend => Volatility::Immutable,
BuiltinScalarFunction::ArraySort => Volatility::Immutable,
- BuiltinScalarFunction::ArrayConcat => Volatility::Immutable,
BuiltinScalarFunction::ArrayDistinct => Volatility::Immutable,
BuiltinScalarFunction::ArrayElement => Volatility::Immutable,
BuiltinScalarFunction::ArrayExcept => Volatility::Immutable,
@@ -355,7 +343,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayPopBack => Volatility::Immutable,
BuiltinScalarFunction::ArrayPosition => Volatility::Immutable,
BuiltinScalarFunction::ArrayPositions => Volatility::Immutable,
- BuiltinScalarFunction::ArrayPrepend => Volatility::Immutable,
BuiltinScalarFunction::ArrayRepeat => Volatility::Immutable,
BuiltinScalarFunction::ArrayRemove => Volatility::Immutable,
BuiltinScalarFunction::ArrayRemoveN => Volatility::Immutable,
@@ -368,7 +355,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayIntersect => Volatility::Immutable,
BuiltinScalarFunction::ArrayUnion => Volatility::Immutable,
BuiltinScalarFunction::ArrayResize => Volatility::Immutable,
- BuiltinScalarFunction::MakeArray => Volatility::Immutable,
BuiltinScalarFunction::Ascii => Volatility::Immutable,
BuiltinScalarFunction::BitLength => Volatility::Immutable,
BuiltinScalarFunction::Btrim => Volatility::Immutable,
@@ -426,25 +412,6 @@ impl BuiltinScalarFunction {
}
}
- /// Returns the dimension [`DataType`] of [`DataType::List`] if
- /// treated as a N-dimensional array.
- ///
- /// ## Examples:
- ///
- /// * `Int64` has dimension 1
- /// * `List(Int64)` has dimension 2
- /// * `List(List(Int64))` has dimension 3
- /// * etc.
- fn return_dimension(self, input_expr_type: &DataType) -> u64 {
- let mut result: u64 = 1;
- let mut current_data_type = input_expr_type;
- while let DataType::List(field) = current_data_type {
- current_data_type = field.data_type();
- result += 1;
- }
- result
- }
-
/// Returns the output [`DataType`] of this function
///
/// This method should be invoked only after `input_expr_types` have been
validated
@@ -463,38 +430,7 @@ impl BuiltinScalarFunction {
// the return type of the built in function.
// Some built-in functions' return type depends on the incoming type.
match self {
- BuiltinScalarFunction::ArrayAppend =>
Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArraySort =>
Ok(input_expr_types[0].clone()),
- BuiltinScalarFunction::ArrayConcat => {
- let mut expr_type = Null;
- let mut max_dims = 0;
- for input_expr_type in input_expr_types {
- match input_expr_type {
- List(field) => {
- if !field.data_type().equals_datatype(&Null) {
- let dims =
self.return_dimension(input_expr_type);
- expr_type = match max_dims.cmp(&dims) {
- Ordering::Greater => expr_type,
- Ordering::Equal => {
- get_wider_type(&expr_type,
input_expr_type)?
- }
- Ordering::Less => {
- max_dims = dims;
- input_expr_type.clone()
- }
- };
- }
- }
- _ => {
- return plan_err!(
- "The {self} function can only accept list as
the args."
- );
- }
- }
- }
-
- Ok(expr_type)
- }
BuiltinScalarFunction::ArrayDistinct =>
Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayElement => match &input_expr_types[0] {
List(field)
@@ -510,7 +446,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayPositions => {
Ok(List(Arc::new(Field::new("item", UInt64, true))))
}
- BuiltinScalarFunction::ArrayPrepend =>
Ok(input_expr_types[1].clone()),
BuiltinScalarFunction::ArrayRepeat => Ok(List(Arc::new(Field::new(
"item",
input_expr_types[0].clone(),
@@ -551,20 +486,6 @@ impl BuiltinScalarFunction {
(dt, _) => Ok(dt),
}
}
- BuiltinScalarFunction::MakeArray => match input_expr_types.len() {
- 0 => Ok(List(Arc::new(Field::new("item", Null, true)))),
- _ => {
- let mut expr_type = Null;
- for input_expr_type in input_expr_types {
- if !input_expr_type.equals_datatype(&Null) {
- expr_type = input_expr_type.clone();
- break;
- }
- }
-
- Ok(List(Arc::new(Field::new("item", expr_type, true))))
- }
- },
BuiltinScalarFunction::Ascii => Ok(Int32),
BuiltinScalarFunction::BitLength => {
utf8_to_int_type(&input_expr_types[0], "bit_length")
@@ -763,18 +684,8 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArraySort => {
Signature::variadic_any(self.volatility())
}
- BuiltinScalarFunction::ArrayAppend => {
- Signature::array_and_element(self.volatility())
- }
- BuiltinScalarFunction::MakeArray => {
- // 0 or more arguments of arbitrary type
- Signature::one_of(vec![VariadicEqual, Any(0)],
self.volatility())
- }
BuiltinScalarFunction::ArrayPopFront =>
Signature::array(self.volatility()),
BuiltinScalarFunction::ArrayPopBack =>
Signature::array(self.volatility()),
- BuiltinScalarFunction::ArrayConcat => {
- Signature::variadic_any(self.volatility())
- }
BuiltinScalarFunction::ArrayElement => {
Signature::array_and_index(self.volatility())
}
@@ -786,9 +697,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayPositions => {
Signature::array_and_element(self.volatility())
}
- BuiltinScalarFunction::ArrayPrepend => {
- Signature::element_and_array(self.volatility())
- }
BuiltinScalarFunction::ArrayRepeat => Signature::any(2,
self.volatility()),
BuiltinScalarFunction::ArrayRemove => {
Signature::array_and_element(self.volatility())
@@ -1213,17 +1121,7 @@ impl BuiltinScalarFunction {
// other functions
BuiltinScalarFunction::ArrowTypeof => &["arrow_typeof"],
- // array functions
- BuiltinScalarFunction::ArrayAppend => &[
- "array_append",
- "list_append",
- "array_push_back",
- "list_push_back",
- ],
BuiltinScalarFunction::ArraySort => &["array_sort", "list_sort"],
- BuiltinScalarFunction::ArrayConcat => {
- &["array_concat", "array_cat", "list_concat", "list_cat"]
- }
BuiltinScalarFunction::ArrayDistinct => &["array_distinct",
"list_distinct"],
BuiltinScalarFunction::ArrayElement => &[
"array_element",
@@ -1245,12 +1143,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayPositions => {
&["array_positions", "list_positions"]
}
- BuiltinScalarFunction::ArrayPrepend => &[
- "array_prepend",
- "list_prepend",
- "array_push_front",
- "list_push_front",
- ],
BuiltinScalarFunction::ArrayRepeat => &["array_repeat",
"list_repeat"],
BuiltinScalarFunction::ArrayRemove => &["array_remove",
"list_remove"],
BuiltinScalarFunction::ArrayRemoveN => &["array_remove_n",
"list_remove_n"],
@@ -1268,7 +1160,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArraySlice => &["array_slice",
"list_slice"],
BuiltinScalarFunction::ArrayUnion => &["array_union",
"list_union"],
BuiltinScalarFunction::ArrayResize => &["array_resize",
"list_resize"],
- BuiltinScalarFunction::MakeArray => &["make_array", "make_list"],
BuiltinScalarFunction::ArrayIntersect => {
&["array_intersect", "list_intersect"]
}
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 1e934267e9..d1ae06d68f 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -586,14 +586,6 @@ scalar_expr!(
scalar_expr!(Uuid, uuid, , "returns uuid v4 as a string value");
scalar_expr!(Log, log, base x, "logarithm of a `x` for a particular `base`");
-// array functions
-scalar_expr!(
- ArrayAppend,
- array_append,
- array element,
- "appends an element to the end of an array."
-);
-
scalar_expr!(ArraySort, array_sort, array desc null_first, "returns sorted
array.");
scalar_expr!(
@@ -610,7 +602,6 @@ scalar_expr!(
"returns the array without the first element."
);
-nary_scalar_expr!(ArrayConcat, array_concat, "concatenates arrays.");
scalar_expr!(
ArrayElement,
array_element,
@@ -641,12 +632,6 @@ scalar_expr!(
array element,
"searches for an element in the array, returns all occurrences."
);
-scalar_expr!(
- ArrayPrepend,
- array_prepend,
- array element,
- "prepends an element to the beginning of an array."
-);
scalar_expr!(
ArrayRepeat,
array_repeat,
@@ -710,11 +695,6 @@ scalar_expr!(
"returns an array with the specified size filled with the given value."
);
-nary_scalar_expr!(
- MakeArray,
- array,
- "returns an Arrow array using the specified input expressions."
-);
scalar_expr!(
ArrayIntersect,
array_intersect,
@@ -1308,13 +1288,11 @@ mod test {
test_scalar_expr!(FromUnixtime, from_unixtime, unixtime);
- test_scalar_expr!(ArrayAppend, array_append, array, element);
test_scalar_expr!(ArraySort, array_sort, array, desc, null_first);
test_scalar_expr!(ArrayPopFront, array_pop_front, array);
test_scalar_expr!(ArrayPopBack, array_pop_back, array);
test_scalar_expr!(ArrayPosition, array_position, array, element,
index);
test_scalar_expr!(ArrayPositions, array_positions, array, element);
- test_scalar_expr!(ArrayPrepend, array_prepend, array, element);
test_scalar_expr!(ArrayRepeat, array_repeat, element, count);
test_scalar_expr!(ArrayRemove, array_remove, array, element);
test_scalar_expr!(ArrayRemoveN, array_remove_n, array, element, max);
@@ -1322,7 +1300,6 @@ mod test {
test_scalar_expr!(ArrayReplace, array_replace, array, from, to);
test_scalar_expr!(ArrayReplaceN, array_replace_n, array, from, to,
max);
test_scalar_expr!(ArrayReplaceAll, array_replace_all, array, from, to);
- test_nary_scalar_expr!(MakeArray, array, input);
test_unary_scalar_expr!(ArrowTypeof, arrow_typeof);
test_nary_scalar_expr!(OverLay, overlay, string, characters, position,
len);
diff --git a/datafusion/functions-array/Cargo.toml
b/datafusion/functions-array/Cargo.toml
index 17be817238..ba7d9e26ec 100644
--- a/datafusion/functions-array/Cargo.toml
+++ b/datafusion/functions-array/Cargo.toml
@@ -38,6 +38,9 @@ path = "src/lib.rs"
[dependencies]
arrow = { workspace = true }
+arrow-array = { workspace = true }
+arrow-buffer = { workspace = true }
+arrow-schema = { workspace = true }
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
diff --git a/datafusion/functions-array/src/concat.rs
b/datafusion/functions-array/src/concat.rs
new file mode 100644
index 0000000000..a8e7d1008f
--- /dev/null
+++ b/datafusion/functions-array/src/concat.rs
@@ -0,0 +1,436 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Includes `array append`, `array prepend`, and `array concat` functions
+
+use std::{any::Any, cmp::Ordering, sync::Arc};
+
+use arrow::array::{Capacities, MutableArrayData};
+use arrow_array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait};
+use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer};
+use arrow_schema::{DataType, Field};
+use datafusion_common::Result;
+use datafusion_common::{
+ cast::as_generic_list_array, exec_err, not_impl_err, plan_err,
utils::list_ndims,
+};
+use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::Expr;
+use datafusion_expr::{
+ type_coercion::binary::get_wider_type, ColumnarValue, ScalarUDFImpl,
Signature,
+ Volatility,
+};
+
+use crate::utils::{align_array_dimensions, check_datatypes,
make_scalar_function};
+
+make_udf_function!(
+ ArrayAppend,
+ array_append,
+ array element, // arg name
+ "appends an element to the end of an array.", // doc
+ array_append_udf // internal function name
+);
+
+#[derive(Debug)]
+pub(super) struct ArrayAppend {
+ signature: Signature,
+ aliases: Vec<String>,
+}
+
+impl ArrayAppend {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::array_and_element(Volatility::Immutable),
+ aliases: vec![
+ String::from("array_append"),
+ String::from("list_append"),
+ String::from("array_push_back"),
+ String::from("list_push_back"),
+ ],
+ }
+ }
+}
+
+impl ScalarUDFImpl for ArrayAppend {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "array_append"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ Ok(arg_types[0].clone())
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ make_scalar_function(array_append_inner)(args)
+ }
+
+ fn aliases(&self) -> &[String] {
+ &self.aliases
+ }
+}
+
+make_udf_function!(
+ ArrayPrepend,
+ array_prepend,
+ element array,
+ "Prepends an element to the beginning of an array.",
+ array_prepend_udf
+);
+
+#[derive(Debug)]
+pub(super) struct ArrayPrepend {
+ signature: Signature,
+ aliases: Vec<String>,
+}
+
+impl ArrayPrepend {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::element_and_array(Volatility::Immutable),
+ aliases: vec![
+ String::from("array_prepend"),
+ String::from("list_prepend"),
+ String::from("array_push_front"),
+ String::from("list_push_front"),
+ ],
+ }
+ }
+}
+
+impl ScalarUDFImpl for ArrayPrepend {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "array_prepend"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ Ok(arg_types[1].clone())
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ make_scalar_function(array_prepend_inner)(args)
+ }
+
+ fn aliases(&self) -> &[String] {
+ &self.aliases
+ }
+}
+
+make_udf_function!(
+ ArrayConcat,
+ array_concat,
+ "Concatenates arrays.",
+ array_concat_udf
+);
+
+#[derive(Debug)]
+pub(super) struct ArrayConcat {
+ signature: Signature,
+ aliases: Vec<String>,
+}
+
+impl ArrayConcat {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::variadic_any(Volatility::Immutable),
+ aliases: vec![
+ String::from("array_concat"),
+ String::from("array_cat"),
+ String::from("list_concat"),
+ String::from("list_cat"),
+ ],
+ }
+ }
+}
+
+impl ScalarUDFImpl for ArrayConcat {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "array_concat"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ let mut expr_type = DataType::Null;
+ let mut max_dims = 0;
+ for arg_type in arg_types {
+ match arg_type {
+ DataType::List(field) => {
+ if !field.data_type().equals_datatype(&DataType::Null) {
+ let dims = list_ndims(arg_type);
+ expr_type = match max_dims.cmp(&dims) {
+ Ordering::Greater => expr_type,
+ Ordering::Equal => get_wider_type(&expr_type,
arg_type)?,
+ Ordering::Less => {
+ max_dims = dims;
+ arg_type.clone()
+ }
+ };
+ }
+ }
+ _ => {
+ return plan_err!(
+ "The array_concat function can only accept list as the
args."
+ )
+ }
+ }
+ }
+
+ Ok(expr_type)
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ make_scalar_function(array_concat_inner)(args)
+ }
+
+ fn aliases(&self) -> &[String] {
+ &self.aliases
+ }
+}
+
+/// Array_concat/Array_cat SQL function
+pub(crate) fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+ if args.is_empty() {
+ return exec_err!("array_concat expects at least one arguments");
+ }
+
+ let mut new_args = vec![];
+ for arg in args {
+ let ndim = list_ndims(arg.data_type());
+ let base_type = datafusion_common::utils::base_type(arg.data_type());
+ if ndim == 0 {
+ return not_impl_err!("Array is not type '{base_type:?}'.");
+ }
+ if !base_type.eq(&DataType::Null) {
+ new_args.push(arg.clone());
+ }
+ }
+
+ match &args[0].data_type() {
+ DataType::LargeList(_) => concat_internal::<i64>(new_args.as_slice()),
+ _ => concat_internal::<i32>(new_args.as_slice()),
+ }
+}
+
+fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
+ let args = align_array_dimensions::<O>(args.to_vec())?;
+
+ let list_arrays = args
+ .iter()
+ .map(|arg| as_generic_list_array::<O>(arg))
+ .collect::<Result<Vec<_>>>()?;
+ // Assume number of rows is the same for all arrays
+ let row_count = list_arrays[0].len();
+
+ let mut array_lengths = vec![];
+ let mut arrays = vec![];
+ let mut valid = BooleanBufferBuilder::new(row_count);
+ for i in 0..row_count {
+ let nulls = list_arrays
+ .iter()
+ .map(|arr| arr.is_null(i))
+ .collect::<Vec<_>>();
+
+ // If all the arrays are null, the concatenated array is null
+ let is_null = nulls.iter().all(|&x| x);
+ if is_null {
+ array_lengths.push(0);
+ valid.append(false);
+ } else {
+ // Get all the arrays on i-th row
+ let values = list_arrays
+ .iter()
+ .map(|arr| arr.value(i))
+ .collect::<Vec<_>>();
+
+ let elements = values
+ .iter()
+ .map(|a| a.as_ref())
+ .collect::<Vec<&dyn Array>>();
+
+ // Concatenated array on i-th row
+ let concated_array = arrow::compute::concat(elements.as_slice())?;
+ array_lengths.push(concated_array.len());
+ arrays.push(concated_array);
+ valid.append(true);
+ }
+ }
+ // Assume all arrays have the same data type
+ let data_type = list_arrays[0].value_type();
+ let buffer = valid.finish();
+
+ let elements = arrays
+ .iter()
+ .map(|a| a.as_ref())
+ .collect::<Vec<&dyn Array>>();
+
+ let list_arr = GenericListArray::<O>::new(
+ Arc::new(Field::new("item", data_type, true)),
+ OffsetBuffer::from_lengths(array_lengths),
+ Arc::new(arrow::compute::concat(elements.as_slice())?),
+ Some(NullBuffer::new(buffer)),
+ );
+
+ Ok(Arc::new(list_arr))
+}
+
+/// Kernal functions
+
+/// Array_append SQL function
+pub(crate) fn array_append_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+ if args.len() != 2 {
+ return exec_err!("array_append expects two arguments");
+ }
+
+ match args[0].data_type() {
+ DataType::LargeList(_) => general_append_and_prepend::<i64>(args,
true),
+ _ => general_append_and_prepend::<i32>(args, true),
+ }
+}
+
+/// Array_prepend SQL function
+pub(crate) fn array_prepend_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+ if args.len() != 2 {
+ return exec_err!("array_prepend expects two arguments");
+ }
+
+ match args[1].data_type() {
+ DataType::LargeList(_) => general_append_and_prepend::<i64>(args,
false),
+ _ => general_append_and_prepend::<i32>(args, false),
+ }
+}
+
+fn general_append_and_prepend<O: OffsetSizeTrait>(
+ args: &[ArrayRef],
+ is_append: bool,
+) -> Result<ArrayRef>
+where
+ i64: TryInto<O>,
+{
+ let (list_array, element_array) = if is_append {
+ let list_array = as_generic_list_array::<O>(&args[0])?;
+ let element_array = &args[1];
+ check_datatypes("array_append", &[element_array,
list_array.values()])?;
+ (list_array, element_array)
+ } else {
+ let list_array = as_generic_list_array::<O>(&args[1])?;
+ let element_array = &args[0];
+ check_datatypes("array_prepend", &[list_array.values(),
element_array])?;
+ (list_array, element_array)
+ };
+
+ let res = match list_array.value_type() {
+ DataType::List(_) => concat_internal::<i32>(args)?,
+ DataType::LargeList(_) => concat_internal::<i64>(args)?,
+ data_type => {
+ return generic_append_and_prepend::<O>(
+ list_array,
+ element_array,
+ &data_type,
+ is_append,
+ );
+ }
+ };
+
+ Ok(res)
+}
+
+/// Appends or prepends elements to a ListArray.
+///
+/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean
flag
+/// indicating whether to append or prepend the elements. It returns a
`Result<ArrayRef>`
+/// representing the resulting ListArray after the operation.
+///
+/// # Arguments
+///
+/// * `list_array` - A reference to the ListArray to which elements will be
appended/prepended.
+/// * `element_array` - A reference to the Array containing elements to be
appended/prepended.
+/// * `field` - A reference to the Field describing the data type of the
arrays.
+/// * `is_append` - A boolean flag indicating whether to append (`true`) or
prepend (`false`) elements.
+///
+/// # Examples
+///
+/// generic_append_and_prepend(
+/// [1, 2, 3], 4, append => [1, 2, 3, 4]
+/// 5, [6, 7, 8], prepend => [5, 6, 7, 8]
+/// )
+fn generic_append_and_prepend<O: OffsetSizeTrait>(
+ list_array: &GenericListArray<O>,
+ element_array: &ArrayRef,
+ data_type: &DataType,
+ is_append: bool,
+) -> Result<ArrayRef>
+where
+ i64: TryInto<O>,
+{
+ let mut offsets = vec![O::usize_as(0)];
+ let values = list_array.values();
+ let original_data = values.to_data();
+ let element_data = element_array.to_data();
+ let capacity = Capacities::Array(original_data.len() + element_data.len());
+
+ let mut mutable = MutableArrayData::with_capacities(
+ vec![&original_data, &element_data],
+ false,
+ capacity,
+ );
+
+ let values_index = 0;
+ let element_index = 1;
+
+ for (row_index, offset_window) in
list_array.offsets().windows(2).enumerate() {
+ let start = offset_window[0].to_usize().unwrap();
+ let end = offset_window[1].to_usize().unwrap();
+ if is_append {
+ mutable.extend(values_index, start, end);
+ mutable.extend(element_index, row_index, row_index + 1);
+ } else {
+ mutable.extend(element_index, row_index, row_index + 1);
+ mutable.extend(values_index, start, end);
+ }
+ offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
+ }
+
+ let data = mutable.freeze();
+
+ Ok(Arc::new(GenericListArray::<O>::try_new(
+ Arc::new(Field::new("item", data_type.to_owned(), true)),
+ OffsetBuffer::new(offsets.into()),
+ arrow_array::make_array(data),
+ None,
+ )?))
+}
diff --git a/datafusion/functions-array/src/kernels.rs
b/datafusion/functions-array/src/kernels.rs
index ad96d232aa..bb5c4ef53e 100644
--- a/datafusion/functions-array/src/kernels.rs
+++ b/datafusion/functions-array/src/kernels.rs
@@ -31,9 +31,11 @@ use datafusion_common::cast::{
as_date32_array, as_generic_list_array, as_int64_array,
as_interval_mdn_array,
as_large_list_array, as_list_array, as_null_array, as_string_array,
};
-use datafusion_common::{exec_err, not_impl_datafusion_err, DataFusionError,
Result};
+use datafusion_common::DataFusionError;
+use datafusion_common::{exec_err, not_impl_datafusion_err, Result};
use std::any::type_name;
use std::sync::Arc;
+
macro_rules! downcast_arg {
($ARG:expr, $ARRAY_TYPE:ident) => {{
$ARG.as_any().downcast_ref::<$ARRAY_TYPE>().ok_or_else(|| {
diff --git a/datafusion/functions-array/src/lib.rs
b/datafusion/functions-array/src/lib.rs
index 73055966ee..cf1e35d608 100644
--- a/datafusion/functions-array/src/lib.rs
+++ b/datafusion/functions-array/src/lib.rs
@@ -29,7 +29,9 @@
pub mod macros;
mod array_has;
+mod concat;
mod kernels;
+mod make_array;
mod udf;
mod utils;
@@ -44,6 +46,10 @@ pub mod expr_fn {
pub use super::array_has::array_has;
pub use super::array_has::array_has_all;
pub use super::array_has::array_has_any;
+ pub use super::concat::array_append;
+ pub use super::concat::array_concat;
+ pub use super::concat::array_prepend;
+ pub use super::make_array::make_array;
pub use super::udf::array_dims;
pub use super::udf::array_empty;
pub use super::udf::array_length;
@@ -64,6 +70,10 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) ->
Result<()> {
udf::array_dims_udf(),
udf::cardinality_udf(),
udf::array_ndims_udf(),
+ concat::array_append_udf(),
+ concat::array_prepend_udf(),
+ concat::array_concat_udf(),
+ make_array::make_array_udf(),
array_has::array_has_udf(),
array_has::array_has_all_udf(),
array_has::array_has_any_udf(),
diff --git a/datafusion/functions-array/src/macros.rs
b/datafusion/functions-array/src/macros.rs
index c503fde05b..c49f5830b8 100644
--- a/datafusion/functions-array/src/macros.rs
+++ b/datafusion/functions-array/src/macros.rs
@@ -76,4 +76,34 @@ macro_rules! make_udf_function {
}
}
};
+ ($UDF:ty, $EXPR_FN:ident, $DOC:expr , $SCALAR_UDF_FN:ident) => {
+ paste::paste! {
+ // "fluent expr_fn" style function
+ #[doc = $DOC]
+ pub fn $EXPR_FN(arg: Vec<Expr>) -> Expr {
+ Expr::ScalarFunction(ScalarFunction::new_udf(
+ $SCALAR_UDF_FN(),
+ arg,
+ ))
+ }
+
+ /// Singleton instance of [`$UDF`], ensures the UDF is only
created once
+ /// named STATIC_$(UDF). For example `STATIC_ArrayToString`
+ #[allow(non_upper_case_globals)]
+ static [< STATIC_ $UDF >]:
std::sync::OnceLock<std::sync::Arc<datafusion_expr::ScalarUDF>> =
+ std::sync::OnceLock::new();
+ /// ScalarFunction that returns a [`ScalarUDF`] for [`$UDF`]
+ ///
+ /// [`ScalarUDF`]: datafusion_expr::ScalarUDF
+ pub fn $SCALAR_UDF_FN() ->
std::sync::Arc<datafusion_expr::ScalarUDF> {
+ [< STATIC_ $UDF >]
+ .get_or_init(|| {
+
std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
+ <$UDF>::new(),
+ ))
+ })
+ .clone()
+ }
+ }
+ };
}
diff --git a/datafusion/functions-array/src/make_array.rs
b/datafusion/functions-array/src/make_array.rs
new file mode 100644
index 0000000000..a371ea767b
--- /dev/null
+++ b/datafusion/functions-array/src/make_array.rs
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// core array function like `make_array`
+
+use std::{any::Any, sync::Arc};
+
+use arrow::array::{ArrayData, Capacities, MutableArrayData};
+use arrow_array::{
+ new_null_array, Array, ArrayRef, GenericListArray, NullArray,
OffsetSizeTrait,
+};
+use arrow_buffer::OffsetBuffer;
+use arrow_schema::{DataType, Field};
+use datafusion_common::Result;
+use datafusion_common::{plan_err, utils::array_into_list_array};
+use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::Expr;
+use datafusion_expr::{
+ ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility,
+};
+
+use crate::utils::make_scalar_function;
+
+make_udf_function!(
+ MakeArray,
+ make_array,
+ "Returns an Arrow array using the specified input expressions.",
+ make_array_udf
+);
+
+#[derive(Debug)]
+pub struct MakeArray {
+ signature: Signature,
+ aliases: Vec<String>,
+}
+
+impl MakeArray {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::one_of(
+ vec![TypeSignature::VariadicEqual, TypeSignature::Any(0)],
+ Volatility::Immutable,
+ ),
+ aliases: vec![String::from("make_array"),
String::from("make_list")],
+ }
+ }
+}
+
+impl ScalarUDFImpl for MakeArray {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "make_array"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) ->
datafusion_common::Result<DataType> {
+ match arg_types.len() {
+ 0 => Ok(DataType::List(Arc::new(Field::new(
+ "item",
+ DataType::Null,
+ true,
+ )))),
+ _ => {
+ let mut expr_type = DataType::Null;
+ for arg_type in arg_types {
+ if !arg_type.equals_datatype(&DataType::Null) {
+ expr_type = arg_type.clone();
+ break;
+ }
+ }
+
+ Ok(DataType::List(Arc::new(Field::new(
+ "item", expr_type, true,
+ ))))
+ }
+ }
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) ->
datafusion_common::Result<ColumnarValue> {
+ make_scalar_function(make_array_inner)(args)
+ }
+
+ fn aliases(&self) -> &[String] {
+ &self.aliases
+ }
+}
+
+/// `make_array` SQL function
+pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
+ let mut data_type = DataType::Null;
+ for arg in arrays {
+ let arg_data_type = arg.data_type();
+ if !arg_data_type.equals_datatype(&DataType::Null) {
+ data_type = arg_data_type.clone();
+ break;
+ }
+ }
+
+ match data_type {
+ // Either an empty array or all nulls:
+ DataType::Null => {
+ let array =
+ new_null_array(&DataType::Null, arrays.iter().map(|a|
a.len()).sum());
+ Ok(Arc::new(array_into_list_array(array)))
+ }
+ DataType::LargeList(..) => array_array::<i64>(arrays, data_type),
+ _ => array_array::<i32>(arrays, data_type),
+ }
+}
+
+/// Convert one or more [`ArrayRef`] of the same type into a
+/// `ListArray` or 'LargeListArray' depending on the offset size.
+///
+/// # Example (non nested)
+///
+/// Calling `array(col1, col2)` where col1 and col2 are non nested
+/// would return a single new `ListArray`, where each row was a list
+/// of 2 elements:
+///
+/// ```text
+/// ┌─────────┐ ┌─────────┐ ┌──────────────┐
+/// │ ┌─────┐ │ │ ┌─────┐ │ │ ┌──────────┐ │
+/// │ │ A │ │ │ │ X │ │ │ │ [A, X] │ │
+/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │
+/// │ │NULL │ │ │ │ Y │ │──────────▶│ │[NULL, Y] │ │
+/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │
+/// │ │ C │ │ │ │ Z │ │ │ │ [C, Z] │ │
+/// │ └─────┘ │ │ └─────┘ │ │ └──────────┘ │
+/// └─────────┘ └─────────┘ └──────────────┘
+/// col1 col2 output
+/// ```
+///
+/// # Example (nested)
+///
+/// Calling `array(col1, col2)` where col1 and col2 are lists
+/// would return a single new `ListArray`, where each row was a list
+/// of the corresponding elements of col1 and col2.
+///
+/// ``` text
+/// ┌──────────────┐ ┌──────────────┐ ┌─────────────────────────────┐
+/// │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌────────────────────────┐ │
+/// │ │ [A, X] │ │ │ │ [] │ │ │ │ [[A, X], []] │ │
+/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────┤ │
+/// │ │[NULL, Y] │ │ │ │[Q, R, S] │ │───────▶│ │ [[NULL, Y], [Q, R, S]] │ │
+/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────│ │
+/// │ │ [C, Z] │ │ │ │ NULL │ │ │ │ [[C, Z], NULL] │ │
+/// │ └──────────┘ │ │ └──────────┘ │ │ └────────────────────────┘ │
+/// └──────────────┘ └──────────────┘ └─────────────────────────────┘
+/// col1 col2 output
+/// ```
+fn array_array<O: OffsetSizeTrait>(
+ args: &[ArrayRef],
+ data_type: DataType,
+) -> Result<ArrayRef> {
+ // do not accept 0 arguments.
+ if args.is_empty() {
+ return plan_err!("Array requires at least one argument");
+ }
+
+ let mut data = vec![];
+ let mut total_len = 0;
+ for arg in args {
+ let arg_data = if arg.as_any().is::<NullArray>() {
+ ArrayData::new_empty(&data_type)
+ } else {
+ arg.to_data()
+ };
+ total_len += arg_data.len();
+ data.push(arg_data);
+ }
+
+ let mut offsets: Vec<O> = Vec::with_capacity(total_len);
+ offsets.push(O::usize_as(0));
+
+ let capacity = Capacities::Array(total_len);
+ let data_ref = data.iter().collect::<Vec<_>>();
+ let mut mutable = MutableArrayData::with_capacities(data_ref, true,
capacity);
+
+ let num_rows = args[0].len();
+ for row_idx in 0..num_rows {
+ for (arr_idx, arg) in args.iter().enumerate() {
+ if !arg.as_any().is::<NullArray>()
+ && !arg.is_null(row_idx)
+ && arg.is_valid(row_idx)
+ {
+ mutable.extend(arr_idx, row_idx, row_idx + 1);
+ } else {
+ mutable.extend_nulls(1);
+ }
+ }
+ offsets.push(O::usize_as(mutable.len()));
+ }
+ let data = mutable.freeze();
+
+ Ok(Arc::new(GenericListArray::<O>::try_new(
+ Arc::new(Field::new("item", data_type, true)),
+ OffsetBuffer::new(offsets.into()),
+ arrow_array::make_array(data),
+ None,
+ )?))
+}
diff --git a/datafusion/functions-array/src/udf.rs
b/datafusion/functions-array/src/udf.rs
index b2c310e170..854535c237 100644
--- a/datafusion/functions-array/src/udf.rs
+++ b/datafusion/functions-array/src/udf.rs
@@ -25,7 +25,7 @@ use datafusion_common::plan_err;
use datafusion_common::Result;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::Expr;
-use datafusion_expr::TypeSignature::Exact;
+use datafusion_expr::TypeSignature;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use std::sync::Arc;
@@ -107,10 +107,10 @@ impl Range {
Self {
signature: Signature::one_of(
vec![
- Exact(vec![Int64]),
- Exact(vec![Int64, Int64]),
- Exact(vec![Int64, Int64, Int64]),
- Exact(vec![Date32, Date32, Interval(MonthDayNano)]),
+ TypeSignature::Exact(vec![Int64]),
+ TypeSignature::Exact(vec![Int64, Int64]),
+ TypeSignature::Exact(vec![Int64, Int64, Int64]),
+ TypeSignature::Exact(vec![Date32, Date32,
Interval(MonthDayNano)]),
],
Volatility::Immutable,
),
@@ -177,10 +177,10 @@ impl GenSeries {
Self {
signature: Signature::one_of(
vec![
- Exact(vec![Int64]),
- Exact(vec![Int64, Int64]),
- Exact(vec![Int64, Int64, Int64]),
- Exact(vec![Date32, Date32, Interval(MonthDayNano)]),
+ TypeSignature::Exact(vec![Int64]),
+ TypeSignature::Exact(vec![Int64, Int64]),
+ TypeSignature::Exact(vec![Int64, Int64, Int64]),
+ TypeSignature::Exact(vec![Date32, Date32,
Interval(MonthDayNano)]),
],
Volatility::Immutable,
),
diff --git a/datafusion/functions-array/src/utils.rs
b/datafusion/functions-array/src/utils.rs
index d374a9f66b..3a6bb723c1 100644
--- a/datafusion/functions-array/src/utils.rs
+++ b/datafusion/functions-array/src/utils.rs
@@ -17,8 +17,14 @@
//! array function utils
+use std::sync::Arc;
+
use arrow::{array::ArrayRef, datatypes::DataType};
-use datafusion_common::{plan_err, Result};
+use arrow_array::{GenericListArray, OffsetSizeTrait};
+use arrow_buffer::OffsetBuffer;
+use arrow_schema::Field;
+use datafusion_common::{plan_err, Result, ScalarValue};
+use datafusion_expr::{ColumnarValue, ScalarFunctionImplementation};
pub(crate) fn check_datatypes(name: &str, args: &[&ArrayRef]) -> Result<()> {
let data_type = args[0].data_type();
@@ -32,3 +38,124 @@ pub(crate) fn check_datatypes(name: &str, args:
&[&ArrayRef]) -> Result<()> {
Ok(())
}
+
+pub(crate) fn make_scalar_function<F>(inner: F) -> ScalarFunctionImplementation
+where
+ F: Fn(&[ArrayRef]) -> Result<ArrayRef> + Sync + Send + 'static,
+{
+ Arc::new(move |args: &[ColumnarValue]| {
+ // first, identify if any of the arguments is an Array. If yes, store
its `len`,
+ // as any scalar will need to be converted to an array of len `len`.
+ let len = args
+ .iter()
+ .fold(Option::<usize>::None, |acc, arg| match arg {
+ ColumnarValue::Scalar(_) => acc,
+ ColumnarValue::Array(a) => Some(a.len()),
+ });
+
+ let is_scalar = len.is_none();
+
+ let args = ColumnarValue::values_to_arrays(args)?;
+
+ let result = (inner)(&args);
+
+ if is_scalar {
+ // If all inputs are scalar, keeps output as scalar
+ let result = result.and_then(|arr|
ScalarValue::try_from_array(&arr, 0));
+ result.map(ColumnarValue::Scalar)
+ } else {
+ result.map(ColumnarValue::Array)
+ }
+ })
+}
+
+pub(crate) fn align_array_dimensions<O: OffsetSizeTrait>(
+ args: Vec<ArrayRef>,
+) -> Result<Vec<ArrayRef>> {
+ let args_ndim = args
+ .iter()
+ .map(|arg| datafusion_common::utils::list_ndims(arg.data_type()))
+ .collect::<Vec<_>>();
+ let max_ndim = args_ndim.iter().max().unwrap_or(&0);
+
+ // Align the dimensions of the arrays
+ let aligned_args: Result<Vec<ArrayRef>> = args
+ .into_iter()
+ .zip(args_ndim.iter())
+ .map(|(array, ndim)| {
+ if ndim < max_ndim {
+ let mut aligned_array = array.clone();
+ for _ in 0..(max_ndim - ndim) {
+ let data_type = aligned_array.data_type().to_owned();
+ let array_lengths = vec![1; aligned_array.len()];
+ let offsets =
OffsetBuffer::<O>::from_lengths(array_lengths);
+
+ aligned_array = Arc::new(GenericListArray::<O>::try_new(
+ Arc::new(Field::new("item", data_type, true)),
+ offsets,
+ aligned_array,
+ None,
+ )?)
+ }
+ Ok(aligned_array)
+ } else {
+ Ok(array.clone())
+ }
+ })
+ .collect();
+
+ aligned_args
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::datatypes::Int64Type;
+ use arrow_array::ListArray;
+ use datafusion_common::{cast::as_list_array, utils::array_into_list_array};
+
+ /// Only test internal functions, array-related sql functions will be
tested in sqllogictest `array.slt`
+ #[test]
+ fn test_align_array_dimensions() {
+ let array1d_1 =
+ Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
+ Some(vec![Some(1), Some(2), Some(3)]),
+ Some(vec![Some(4), Some(5)]),
+ ]));
+ let array1d_2 =
+ Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
+ Some(vec![Some(6), Some(7), Some(8)]),
+ ]));
+
+ let array2d_1 = Arc::new(array_into_list_array(array1d_1.clone())) as
ArrayRef;
+ let array2d_2 = Arc::new(array_into_list_array(array1d_2.clone())) as
ArrayRef;
+
+ let res = align_array_dimensions::<i32>(vec![
+ array1d_1.to_owned(),
+ array2d_2.to_owned(),
+ ])
+ .unwrap();
+
+ let expected = as_list_array(&array2d_1).unwrap();
+ let expected_dim =
datafusion_common::utils::list_ndims(array2d_1.data_type());
+ assert_ne!(as_list_array(&res[0]).unwrap(), expected);
+ assert_eq!(
+ datafusion_common::utils::list_ndims(res[0].data_type()),
+ expected_dim
+ );
+
+ let array3d_1 = Arc::new(array_into_list_array(array2d_1)) as ArrayRef;
+ let array3d_2 = array_into_list_array(array2d_2.to_owned());
+ let res =
+ align_array_dimensions::<i32>(vec![array1d_1,
Arc::new(array3d_2.clone())])
+ .unwrap();
+
+ let expected = as_list_array(&array3d_1).unwrap();
+ let expected_dim =
datafusion_common::utils::list_ndims(array3d_1.data_type());
+ assert_ne!(as_list_array(&res[0]).unwrap(), expected);
+ assert_eq!(
+ datafusion_common::utils::list_ndims(res[0].data_type()),
+ expected_dim
+ );
+ }
+}
diff --git a/datafusion/optimizer/src/analyzer/rewrite_expr.rs
b/datafusion/optimizer/src/analyzer/rewrite_expr.rs
index 6f856fa9bd..99578e9118 100644
--- a/datafusion/optimizer/src/analyzer/rewrite_expr.rs
+++ b/datafusion/optimizer/src/analyzer/rewrite_expr.rs
@@ -17,23 +17,27 @@
//! Analyzer rule for to replace operators with function calls (e.g `||` to
array_concat`)
+#[cfg(feature = "array_expressions")]
use std::sync::Arc;
use super::AnalyzerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
-use datafusion_common::utils::list_ndims;
-use datafusion_common::{DFSchema, DFSchemaRef, Result};
+#[cfg(feature = "array_expressions")]
+use datafusion_common::{utils::list_ndims, DFSchemaRef};
+use datafusion_common::{DFSchema, Result};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::expr_rewriter::rewrite_preserving_name;
use datafusion_expr::utils::merge_schema;
use datafusion_expr::BuiltinScalarFunction;
use datafusion_expr::GetFieldAccess;
use datafusion_expr::GetIndexedField;
-use datafusion_expr::Operator;
-use datafusion_expr::ScalarFunctionDefinition;
-use datafusion_expr::{BinaryExpr, Expr, LogicalPlan};
+#[cfg(feature = "array_expressions")]
+use datafusion_expr::{BinaryExpr, Operator, ScalarFunctionDefinition};
+use datafusion_expr::{Expr, LogicalPlan};
+#[cfg(feature = "array_expressions")]
+use datafusion_functions_array::expr_fn::{array_append, array_concat,
array_prepend};
#[derive(Default)]
pub struct OperatorToFunction {}
@@ -73,6 +77,7 @@ fn analyze_internal(plan: &LogicalPlan) ->
Result<LogicalPlan> {
}
let mut expr_rewrite = OperatorToFunctionRewriter {
+ #[cfg(feature = "array_expressions")]
schema: Arc::new(schema),
};
@@ -90,6 +95,7 @@ fn analyze_internal(plan: &LogicalPlan) ->
Result<LogicalPlan> {
}
pub(crate) struct OperatorToFunctionRewriter {
+ #[cfg(feature = "array_expressions")]
pub(crate) schema: DFSchemaRef,
}
@@ -97,13 +103,14 @@ impl TreeNodeRewriter for OperatorToFunctionRewriter {
type Node = Expr;
fn f_up(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
+ #[cfg(feature = "array_expressions")]
if let Expr::BinaryExpr(BinaryExpr {
ref left,
op,
ref right,
}) = expr
{
- if let Some(fun) =
rewrite_array_concat_operator_to_func_for_column(
+ if let Some(expr) =
rewrite_array_concat_operator_to_func_for_column(
left.as_ref(),
op,
right.as_ref(),
@@ -113,12 +120,7 @@ impl TreeNodeRewriter for OperatorToFunctionRewriter {
rewrite_array_concat_operator_to_func(left.as_ref(), op,
right.as_ref())
}) {
// Convert &Box<Expr> -> Expr
- let left = (**left).clone();
- let right = (**right).clone();
- return Ok(Transformed::yes(Expr::ScalarFunction(ScalarFunction
{
- func_def: ScalarFunctionDefinition::BuiltIn(fun),
- args: vec![left, right],
- })));
+ return Ok(Transformed::yes(expr));
}
// TODO: change OperatorToFunction to OperatoToArrayFunction and
configure it with array_expressions feature
@@ -185,16 +187,14 @@ fn rewrite_array_has_all_operator_to_func(
// array1 <@ array2 -> array_has_all(array2, array1)
(
Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray),
+ func_def: ScalarFunctionDefinition::UDF(left_fun),
args: _left_args,
}),
Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray),
+ func_def: ScalarFunctionDefinition::UDF(right_fun),
args: _right_args,
}),
- ) => {
+ ) if left_fun.name() == "make_array" && right_fun.name() ==
"make_array" => {
let left = left.clone();
let right = right.clone();
@@ -220,11 +220,12 @@ fn rewrite_array_has_all_operator_to_func(
/// 4) (arry concat, array append, array prepend) || array -> array concat
///
/// 5) (arry concat, array append, array prepend) || scalar -> array append
+#[cfg(feature = "array_expressions")]
fn rewrite_array_concat_operator_to_func(
left: &Expr,
op: Operator,
right: &Expr,
-) -> Option<BuiltinScalarFunction> {
+) -> Option<Expr> {
// Convert `Array StringConcat Array` to ScalarFunction::ArrayConcat
if op != Operator::StringConcat {
@@ -236,97 +237,65 @@ fn rewrite_array_concat_operator_to_func(
// (arry concat, array append, array prepend) || array -> array concat
(
Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayConcat),
- args: _left_args,
- }),
- Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray),
- args: _right_args,
- }),
- )
- | (
- Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayAppend),
+ func_def: ScalarFunctionDefinition::UDF(left_fun),
args: _left_args,
}),
Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray),
+ func_def: ScalarFunctionDefinition::UDF(right_fun),
args: _right_args,
}),
- )
- | (
- Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayPrepend),
- args: _left_args,
- }),
- Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray),
- args: _right_args,
- }),
- ) => Some(BuiltinScalarFunction::ArrayConcat),
+ ) if ["array_append", "array_prepend", "array_concat"]
+ .contains(&left_fun.name())
+ && right_fun.name() == "make_array" =>
+ {
+ Some(array_concat(vec![left.clone(), right.clone()]))
+ }
// Chain concat operator (a || b) || scalar,
// (arry concat, array append, array prepend) || scalar -> array append
(
Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayConcat),
- args: _left_args,
- }),
- _scalar,
- )
- | (
- Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayAppend),
+ func_def: ScalarFunctionDefinition::UDF(left_fun),
args: _left_args,
}),
_scalar,
- )
- | (
- Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayPrepend),
- args: _left_args,
- }),
- _scalar,
- ) => Some(BuiltinScalarFunction::ArrayAppend),
+ ) if ["array_append", "array_prepend", "array_concat"]
+ .contains(&left_fun.name()) =>
+ {
+ Some(array_append(left.clone(), right.clone()))
+ }
// array || array -> array concat
(
Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray),
+ func_def: ScalarFunctionDefinition::UDF(left_fun),
args: _left_args,
}),
Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray),
+ func_def: ScalarFunctionDefinition::UDF(right_fun),
args: _right_args,
}),
- ) => Some(BuiltinScalarFunction::ArrayConcat),
+ ) if left_fun.name() == "make_array" && right_fun.name() ==
"make_array" => {
+ Some(array_concat(vec![left.clone(), right.clone()]))
+ }
// array || scalar -> array append
(
Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray),
+ func_def: ScalarFunctionDefinition::UDF(left_fun),
args: _left_args,
}),
_right_scalar,
- ) => Some(BuiltinScalarFunction::ArrayAppend),
+ ) if left_fun.name() == "make_array" => {
+ Some(array_append(left.clone(), right.clone()))
+ }
// scalar || array -> array prepend
(
_left_scalar,
Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray),
+ func_def: ScalarFunctionDefinition::UDF(right_fun),
args: _right_args,
}),
- ) => Some(BuiltinScalarFunction::ArrayPrepend),
+ ) if right_fun.name() == "make_array" => {
+ Some(array_prepend(left.clone(), right.clone()))
+ }
_ => None,
}
@@ -337,12 +306,13 @@ fn rewrite_array_concat_operator_to_func(
/// 1) (arry concat, array append, array prepend) || column -> (array append,
array concat)
///
/// 2) column1 || column2 -> (array prepend, array append, array concat)
+#[cfg(feature = "array_expressions")]
fn rewrite_array_concat_operator_to_func_for_column(
left: &Expr,
op: Operator,
right: &Expr,
schema: &DFSchema,
-) -> Result<Option<BuiltinScalarFunction>> {
+) -> Result<Option<Expr>> {
if op != Operator::StringConcat {
return Ok(None);
}
@@ -352,33 +322,18 @@ fn rewrite_array_concat_operator_to_func_for_column(
// 1) array_prepend/append/concat || column
(
Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayPrepend),
- args: _left_args,
- }),
- Expr::Column(c),
- )
- | (
- Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayAppend),
+ func_def: ScalarFunctionDefinition::UDF(left_fun),
args: _left_args,
}),
Expr::Column(c),
- )
- | (
- Expr::ScalarFunction(ScalarFunction {
- func_def:
-
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayConcat),
- args: _left_args,
- }),
- Expr::Column(c),
- ) => {
+ ) if ["array_append", "array_prepend", "array_concat"]
+ .contains(&left_fun.name()) =>
+ {
let d = schema.field_from_column(c)?.data_type();
let ndim = list_ndims(d);
match ndim {
- 0 => Ok(Some(BuiltinScalarFunction::ArrayAppend)),
- _ => Ok(Some(BuiltinScalarFunction::ArrayConcat)),
+ 0 => Ok(Some(array_append(left.clone(), right.clone()))),
+ _ => Ok(Some(array_concat(vec![left.clone(), right.clone()]))),
}
}
// 2) select column1 || column2
@@ -388,9 +343,9 @@ fn rewrite_array_concat_operator_to_func_for_column(
let ndim1 = list_ndims(d1);
let ndim2 = list_ndims(d2);
match (ndim1, ndim2) {
- (0, _) => Ok(Some(BuiltinScalarFunction::ArrayPrepend)),
- (_, 0) => Ok(Some(BuiltinScalarFunction::ArrayAppend)),
- _ => Ok(Some(BuiltinScalarFunction::ArrayConcat)),
+ (0, _) => Ok(Some(array_prepend(left.clone(), right.clone()))),
+ (_, 0) => Ok(Some(array_append(left.clone(), right.clone()))),
+ _ => Ok(Some(array_concat(vec![left.clone(), right.clone()]))),
}
}
_ => Ok(None),
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 496def95e1..fabeba4393 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -46,8 +46,8 @@ use datafusion_expr::type_coercion::{is_datetime,
is_utf8_or_large_utf8};
use datafusion_expr::utils::merge_schema;
use datafusion_expr::{
is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown,
not,
- type_coercion, AggregateFunction, BuiltinScalarFunction, Expr,
ExprSchemable,
- LogicalPlan, Operator, Projection, ScalarFunctionDefinition, Signature,
WindowFrame,
+ type_coercion, AggregateFunction, Expr, ExprSchemable, LogicalPlan,
Operator,
+ Projection, ScalarFunctionDefinition, ScalarUDF, Signature, WindowFrame,
WindowFrameBound, WindowFrameUnits,
};
@@ -316,11 +316,6 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
&self.schema,
&fun.signature(),
)?;
- let new_args = coerce_arguments_for_fun(
- new_args.as_slice(),
- &self.schema,
- &fun,
- )?;
Ok(Transformed::yes(Expr::ScalarFunction(ScalarFunction::new(
fun, new_args,
))))
@@ -331,6 +326,11 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
&self.schema,
fun.signature(),
)?;
+ let new_expr = coerce_arguments_for_fun(
+ new_expr.as_slice(),
+ &self.schema,
+ &fun,
+ )?;
Ok(Transformed::yes(Expr::ScalarFunction(
ScalarFunction::new_udf(fun, new_expr),
)))
@@ -583,7 +583,7 @@ fn coerce_arguments_for_signature(
fn coerce_arguments_for_fun(
expressions: &[Expr],
schema: &DFSchema,
- fun: &BuiltinScalarFunction,
+ fun: &Arc<ScalarUDF>,
) -> Result<Vec<Expr>> {
if expressions.is_empty() {
return Ok(vec![]);
@@ -591,7 +591,7 @@ fn coerce_arguments_for_fun(
let mut expressions: Vec<Expr> = expressions.to_vec();
// Cast Fixedsizelist to List for array functions
- if *fun == BuiltinScalarFunction::MakeArray {
+ if fun.name() == "make_array" {
expressions = expressions
.into_iter()
.map(|expr| {
@@ -776,6 +776,7 @@ mod test {
LogicalPlan, Operator, ScalarUDF, ScalarUDFImpl, Signature,
SimpleAggregateUDF,
Subquery, Volatility,
};
+ use datafusion_functions_array::expr_fn::make_array;
use datafusion_physical_expr::expressions::AvgAccumulator;
fn empty() -> Arc<LogicalPlan> {
@@ -1266,10 +1267,7 @@ mod test {
None,
),
)));
- let expr = Expr::ScalarFunction(ScalarFunction::new(
- BuiltinScalarFunction::MakeArray,
- vec![val.clone()],
- ));
+ let expr = make_array(vec![val.clone()]);
let schema = Arc::new(DFSchema::new_with_metadata(
vec![DFField::new_unqualified(
"item",
@@ -1298,10 +1296,7 @@ mod test {
&schema,
)?;
- let expected = Expr::ScalarFunction(ScalarFunction::new(
- BuiltinScalarFunction::MakeArray,
- vec![expected_casted_expr],
- ));
+ let expected = make_array(vec![expected_casted_expr]);
assert_eq!(result, expected);
Ok(())
diff --git a/datafusion/physical-expr/src/array_expressions.rs
b/datafusion/physical-expr/src/array_expressions.rs
index 5be72b0559..3f7ea57df2 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -33,10 +33,10 @@ use datafusion_common::cast::{
as_generic_list_array, as_generic_string_array, as_int64_array,
as_large_list_array,
as_list_array, as_string_array,
};
-use datafusion_common::utils::{array_into_list_array, list_ndims};
+use datafusion_common::utils::array_into_list_array;
use datafusion_common::{
- exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err,
- DataFusionError, Result, ScalarValue,
+ exec_err, internal_datafusion_err, internal_err, plan_err,
DataFusionError, Result,
+ ScalarValue,
};
use itertools::Itertools;
@@ -746,72 +746,6 @@ pub fn array_pop_back(args: &[ArrayRef]) ->
Result<ArrayRef> {
}
}
-/// Appends or prepends elements to a ListArray.
-///
-/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean
flag
-/// indicating whether to append or prepend the elements. It returns a
`Result<ArrayRef>`
-/// representing the resulting ListArray after the operation.
-///
-/// # Arguments
-///
-/// * `list_array` - A reference to the ListArray to which elements will be
appended/prepended.
-/// * `element_array` - A reference to the Array containing elements to be
appended/prepended.
-/// * `field` - A reference to the Field describing the data type of the
arrays.
-/// * `is_append` - A boolean flag indicating whether to append (`true`) or
prepend (`false`) elements.
-///
-/// # Examples
-///
-/// generic_append_and_prepend(
-/// [1, 2, 3], 4, append => [1, 2, 3, 4]
-/// 5, [6, 7, 8], prepend => [5, 6, 7, 8]
-/// )
-fn generic_append_and_prepend<O: OffsetSizeTrait>(
- list_array: &GenericListArray<O>,
- element_array: &ArrayRef,
- data_type: &DataType,
- is_append: bool,
-) -> Result<ArrayRef>
-where
- i64: TryInto<O>,
-{
- let mut offsets = vec![O::usize_as(0)];
- let values = list_array.values();
- let original_data = values.to_data();
- let element_data = element_array.to_data();
- let capacity = Capacities::Array(original_data.len() + element_data.len());
-
- let mut mutable = MutableArrayData::with_capacities(
- vec![&original_data, &element_data],
- false,
- capacity,
- );
-
- let values_index = 0;
- let element_index = 1;
-
- for (row_index, offset_window) in
list_array.offsets().windows(2).enumerate() {
- let start = offset_window[0].to_usize().unwrap();
- let end = offset_window[1].to_usize().unwrap();
- if is_append {
- mutable.extend(values_index, start, end);
- mutable.extend(element_index, row_index, row_index + 1);
- } else {
- mutable.extend(element_index, row_index, row_index + 1);
- mutable.extend(values_index, start, end);
- }
- offsets.push(offsets[row_index] + O::usize_as(end - start + 1));
- }
-
- let data = mutable.freeze();
-
- Ok(Arc::new(GenericListArray::<O>::try_new(
- Arc::new(Field::new("item", data_type.to_owned(), true)),
- OffsetBuffer::new(offsets.into()),
- arrow_array::make_array(data),
- None,
- )?))
-}
-
/// Array_sort SQL function
pub fn array_sort(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.is_empty() || args.len() > 3 {
@@ -895,189 +829,6 @@ fn order_nulls_first(modifier: &str) -> Result<bool> {
}
}
-fn general_append_and_prepend<O: OffsetSizeTrait>(
- args: &[ArrayRef],
- is_append: bool,
-) -> Result<ArrayRef>
-where
- i64: TryInto<O>,
-{
- let (list_array, element_array) = if is_append {
- let list_array = as_generic_list_array::<O>(&args[0])?;
- let element_array = &args[1];
- check_datatypes("array_append", &[element_array,
list_array.values()])?;
- (list_array, element_array)
- } else {
- let list_array = as_generic_list_array::<O>(&args[1])?;
- let element_array = &args[0];
- check_datatypes("array_prepend", &[list_array.values(),
element_array])?;
- (list_array, element_array)
- };
-
- let res = match list_array.value_type() {
- DataType::List(_) => concat_internal::<i32>(args)?,
- DataType::LargeList(_) => concat_internal::<i64>(args)?,
- data_type => {
- return generic_append_and_prepend::<O>(
- list_array,
- element_array,
- &data_type,
- is_append,
- );
- }
- };
-
- Ok(res)
-}
-
-/// Array_append SQL function
-pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
- if args.len() != 2 {
- return exec_err!("array_append expects two arguments");
- }
-
- match args[0].data_type() {
- DataType::LargeList(_) => general_append_and_prepend::<i64>(args,
true),
- _ => general_append_and_prepend::<i32>(args, true),
- }
-}
-
-/// Array_prepend SQL function
-pub fn array_prepend(args: &[ArrayRef]) -> Result<ArrayRef> {
- if args.len() != 2 {
- return exec_err!("array_prepend expects two arguments");
- }
-
- match args[1].data_type() {
- DataType::LargeList(_) => general_append_and_prepend::<i64>(args,
false),
- _ => general_append_and_prepend::<i32>(args, false),
- }
-}
-
-fn align_array_dimensions<O: OffsetSizeTrait>(
- args: Vec<ArrayRef>,
-) -> Result<Vec<ArrayRef>> {
- let args_ndim = args
- .iter()
- .map(|arg| datafusion_common::utils::list_ndims(arg.data_type()))
- .collect::<Vec<_>>();
- let max_ndim = args_ndim.iter().max().unwrap_or(&0);
-
- // Align the dimensions of the arrays
- let aligned_args: Result<Vec<ArrayRef>> = args
- .into_iter()
- .zip(args_ndim.iter())
- .map(|(array, ndim)| {
- if ndim < max_ndim {
- let mut aligned_array = array.clone();
- for _ in 0..(max_ndim - ndim) {
- let data_type = aligned_array.data_type().to_owned();
- let array_lengths = vec![1; aligned_array.len()];
- let offsets =
OffsetBuffer::<O>::from_lengths(array_lengths);
-
- aligned_array = Arc::new(GenericListArray::<O>::try_new(
- Arc::new(Field::new("item", data_type, true)),
- offsets,
- aligned_array,
- None,
- )?)
- }
- Ok(aligned_array)
- } else {
- Ok(array.clone())
- }
- })
- .collect();
-
- aligned_args
-}
-
-// Concatenate arrays on the same row.
-fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
- let args = align_array_dimensions::<O>(args.to_vec())?;
-
- let list_arrays = args
- .iter()
- .map(|arg| as_generic_list_array::<O>(arg))
- .collect::<Result<Vec<_>>>()?;
- // Assume number of rows is the same for all arrays
- let row_count = list_arrays[0].len();
-
- let mut array_lengths = vec![];
- let mut arrays = vec![];
- let mut valid = BooleanBufferBuilder::new(row_count);
- for i in 0..row_count {
- let nulls = list_arrays
- .iter()
- .map(|arr| arr.is_null(i))
- .collect::<Vec<_>>();
-
- // If all the arrays are null, the concatenated array is null
- let is_null = nulls.iter().all(|&x| x);
- if is_null {
- array_lengths.push(0);
- valid.append(false);
- } else {
- // Get all the arrays on i-th row
- let values = list_arrays
- .iter()
- .map(|arr| arr.value(i))
- .collect::<Vec<_>>();
-
- let elements = values
- .iter()
- .map(|a| a.as_ref())
- .collect::<Vec<&dyn Array>>();
-
- // Concatenated array on i-th row
- let concated_array = compute::concat(elements.as_slice())?;
- array_lengths.push(concated_array.len());
- arrays.push(concated_array);
- valid.append(true);
- }
- }
- // Assume all arrays have the same data type
- let data_type = list_arrays[0].value_type();
- let buffer = valid.finish();
-
- let elements = arrays
- .iter()
- .map(|a| a.as_ref())
- .collect::<Vec<&dyn Array>>();
-
- let list_arr = GenericListArray::<O>::new(
- Arc::new(Field::new("item", data_type, true)),
- OffsetBuffer::from_lengths(array_lengths),
- Arc::new(compute::concat(elements.as_slice())?),
- Some(NullBuffer::new(buffer)),
- );
-
- Ok(Arc::new(list_arr))
-}
-
-/// Array_concat/Array_cat SQL function
-pub fn array_concat(args: &[ArrayRef]) -> Result<ArrayRef> {
- if args.is_empty() {
- return exec_err!("array_concat expects at least one arguments");
- }
-
- let mut new_args = vec![];
- for arg in args {
- let ndim = list_ndims(arg.data_type());
- let base_type = datafusion_common::utils::base_type(arg.data_type());
- if ndim == 0 {
- return not_impl_err!("Array is not type '{base_type:?}'.");
- } else if !base_type.eq(&DataType::Null) {
- new_args.push(arg.clone());
- }
- }
-
- match &args[0].data_type() {
- DataType::LargeList(_) => concat_internal::<i64>(new_args.as_slice()),
- _ => concat_internal::<i32>(new_args.as_slice()),
- }
-}
-
/// Array_repeat SQL function
pub fn array_repeat(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
@@ -2149,54 +1900,3 @@ where
Some(nulls.into()),
)?))
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use arrow::datatypes::Int64Type;
-
- /// Only test internal functions, array-related sql functions will be
tested in sqllogictest `array.slt`
- #[test]
- fn test_align_array_dimensions() {
- let array1d_1 =
- Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
- Some(vec![Some(1), Some(2), Some(3)]),
- Some(vec![Some(4), Some(5)]),
- ]));
- let array1d_2 =
- Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
- Some(vec![Some(6), Some(7), Some(8)]),
- ]));
-
- let array2d_1 = Arc::new(array_into_list_array(array1d_1.clone())) as
ArrayRef;
- let array2d_2 = Arc::new(array_into_list_array(array1d_2.clone())) as
ArrayRef;
-
- let res = align_array_dimensions::<i32>(vec![
- array1d_1.to_owned(),
- array2d_2.to_owned(),
- ])
- .unwrap();
-
- let expected = as_list_array(&array2d_1).unwrap();
- let expected_dim =
datafusion_common::utils::list_ndims(array2d_1.data_type());
- assert_ne!(as_list_array(&res[0]).unwrap(), expected);
- assert_eq!(
- datafusion_common::utils::list_ndims(res[0].data_type()),
- expected_dim
- );
-
- let array3d_1 = Arc::new(array_into_list_array(array2d_1)) as ArrayRef;
- let array3d_2 = array_into_list_array(array2d_2.to_owned());
- let res =
- align_array_dimensions::<i32>(vec![array1d_1,
Arc::new(array3d_2.clone())])
- .unwrap();
-
- let expected = as_list_array(&array3d_1).unwrap();
- let expected_dim =
datafusion_common::utils::list_ndims(array3d_1.data_type());
- assert_ne!(as_list_array(&res[0]).unwrap(), expected);
- assert_eq!(
- datafusion_common::utils::list_ndims(res[0].data_type()),
- expected_dim
- );
- }
-}
diff --git a/datafusion/physical-expr/src/functions.rs
b/datafusion/physical-expr/src/functions.rs
index 2e1d48eb76..e9ac9bd2d6 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -302,15 +302,9 @@ pub fn create_physical_fun(
}
// array functions
- BuiltinScalarFunction::ArrayAppend => Arc::new(|args| {
- make_scalar_function_inner(array_expressions::array_append)(args)
- }),
BuiltinScalarFunction::ArraySort => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_sort)(args)
}),
- BuiltinScalarFunction::ArrayConcat => Arc::new(|args| {
- make_scalar_function_inner(array_expressions::array_concat)(args)
- }),
BuiltinScalarFunction::ArrayDistinct => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_distinct)(args)
}),
@@ -332,9 +326,6 @@ pub fn create_physical_fun(
BuiltinScalarFunction::ArrayPositions => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_positions)(args)
}),
- BuiltinScalarFunction::ArrayPrepend => Arc::new(|args| {
- make_scalar_function_inner(array_expressions::array_prepend)(args)
- }),
BuiltinScalarFunction::ArrayRepeat => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_repeat)(args)
}),
@@ -368,9 +359,6 @@ pub fn create_physical_fun(
BuiltinScalarFunction::ArrayResize => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_resize)(args)
}),
- BuiltinScalarFunction::MakeArray => Arc::new(|args| {
- make_scalar_function_inner(array_expressions::make_array)(args)
- }),
BuiltinScalarFunction::ArrayUnion => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_union)(args)
}),
diff --git a/datafusion/physical-expr/src/scalar_function.rs
b/datafusion/physical-expr/src/scalar_function.rs
index bfe0fdb279..1c9f0e609c 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -153,14 +153,15 @@ impl PhysicalExpr for ScalarFunctionExpr {
if scalar_fun
.signature()
.type_signature
- .supports_zero_argument()
- && scalar_fun != BuiltinScalarFunction::MakeArray =>
+ .supports_zero_argument() =>
{
vec![ColumnarValue::create_null_array(batch.num_rows())]
}
// If the function supports zero argument, we pass in a null array
indicating the batch size.
// This is for user-defined functions.
- (true, Err(_)) if self.supports_zero_argument => {
+ (true, Err(_))
+ if self.supports_zero_argument && self.name != "make_array" =>
+ {
vec![ColumnarValue::create_null_array(batch.num_rows())]
}
_ => self
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index c068b253ce..c2a36af2e7 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -569,7 +569,7 @@ enum ScalarFunction {
Sqrt = 17;
Tan = 18;
Trunc = 19;
- Array = 20;
+ // 20 was Array
// RegexpMatch = 21;
BitLength = 22;
Btrim = 23;
@@ -635,15 +635,15 @@ enum ScalarFunction {
Factorial = 83;
Lcm = 84;
Gcd = 85;
- ArrayAppend = 86;
- ArrayConcat = 87;
+ // 86 was ArrayAppend
+ // 87 was ArrayConcat
// 88 was ArrayDims
ArrayRepeat = 89;
// 90 was ArrayLength
// 91 was ArrayNdims
ArrayPosition = 92;
ArrayPositions = 93;
- ArrayPrepend = 94;
+ // 94 was ArrayPrepend
ArrayRemove = 95;
ArrayReplace = 96;
// 97 was ArrayToString
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 7a366c08ad..0ec6de8f40 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -22115,7 +22115,6 @@ impl serde::Serialize for ScalarFunction {
Self::Sqrt => "Sqrt",
Self::Tan => "Tan",
Self::Trunc => "Trunc",
- Self::Array => "Array",
Self::BitLength => "BitLength",
Self::Btrim => "Btrim",
Self::CharacterLength => "CharacterLength",
@@ -22171,12 +22170,9 @@ impl serde::Serialize for ScalarFunction {
Self::Factorial => "Factorial",
Self::Lcm => "Lcm",
Self::Gcd => "Gcd",
- Self::ArrayAppend => "ArrayAppend",
- Self::ArrayConcat => "ArrayConcat",
Self::ArrayRepeat => "ArrayRepeat",
Self::ArrayPosition => "ArrayPosition",
Self::ArrayPositions => "ArrayPositions",
- Self::ArrayPrepend => "ArrayPrepend",
Self::ArrayRemove => "ArrayRemove",
Self::ArrayReplace => "ArrayReplace",
Self::ArrayElement => "ArrayElement",
@@ -22234,7 +22230,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
"Sqrt",
"Tan",
"Trunc",
- "Array",
"BitLength",
"Btrim",
"CharacterLength",
@@ -22290,12 +22285,9 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
"Factorial",
"Lcm",
"Gcd",
- "ArrayAppend",
- "ArrayConcat",
"ArrayRepeat",
"ArrayPosition",
"ArrayPositions",
- "ArrayPrepend",
"ArrayRemove",
"ArrayReplace",
"ArrayElement",
@@ -22382,7 +22374,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
"Sqrt" => Ok(ScalarFunction::Sqrt),
"Tan" => Ok(ScalarFunction::Tan),
"Trunc" => Ok(ScalarFunction::Trunc),
- "Array" => Ok(ScalarFunction::Array),
"BitLength" => Ok(ScalarFunction::BitLength),
"Btrim" => Ok(ScalarFunction::Btrim),
"CharacterLength" => Ok(ScalarFunction::CharacterLength),
@@ -22438,12 +22429,9 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
"Factorial" => Ok(ScalarFunction::Factorial),
"Lcm" => Ok(ScalarFunction::Lcm),
"Gcd" => Ok(ScalarFunction::Gcd),
- "ArrayAppend" => Ok(ScalarFunction::ArrayAppend),
- "ArrayConcat" => Ok(ScalarFunction::ArrayConcat),
"ArrayRepeat" => Ok(ScalarFunction::ArrayRepeat),
"ArrayPosition" => Ok(ScalarFunction::ArrayPosition),
"ArrayPositions" => Ok(ScalarFunction::ArrayPositions),
- "ArrayPrepend" => Ok(ScalarFunction::ArrayPrepend),
"ArrayRemove" => Ok(ScalarFunction::ArrayRemove),
"ArrayReplace" => Ok(ScalarFunction::ArrayReplace),
"ArrayElement" => Ok(ScalarFunction::ArrayElement),
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 79decc1252..9b34b084c9 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2641,7 +2641,7 @@ pub enum ScalarFunction {
Sqrt = 17,
Tan = 18,
Trunc = 19,
- Array = 20,
+ /// 20 was Array
/// RegexpMatch = 21;
BitLength = 22,
Btrim = 23,
@@ -2707,15 +2707,15 @@ pub enum ScalarFunction {
Factorial = 83,
Lcm = 84,
Gcd = 85,
- ArrayAppend = 86,
- ArrayConcat = 87,
+ /// 86 was ArrayAppend
+ /// 87 was ArrayConcat
/// 88 was ArrayDims
ArrayRepeat = 89,
/// 90 was ArrayLength
/// 91 was ArrayNdims
ArrayPosition = 92,
ArrayPositions = 93,
- ArrayPrepend = 94,
+ /// 94 was ArrayPrepend
ArrayRemove = 95,
ArrayReplace = 96,
/// 97 was ArrayToString
@@ -2785,7 +2785,6 @@ impl ScalarFunction {
ScalarFunction::Sqrt => "Sqrt",
ScalarFunction::Tan => "Tan",
ScalarFunction::Trunc => "Trunc",
- ScalarFunction::Array => "Array",
ScalarFunction::BitLength => "BitLength",
ScalarFunction::Btrim => "Btrim",
ScalarFunction::CharacterLength => "CharacterLength",
@@ -2841,12 +2840,9 @@ impl ScalarFunction {
ScalarFunction::Factorial => "Factorial",
ScalarFunction::Lcm => "Lcm",
ScalarFunction::Gcd => "Gcd",
- ScalarFunction::ArrayAppend => "ArrayAppend",
- ScalarFunction::ArrayConcat => "ArrayConcat",
ScalarFunction::ArrayRepeat => "ArrayRepeat",
ScalarFunction::ArrayPosition => "ArrayPosition",
ScalarFunction::ArrayPositions => "ArrayPositions",
- ScalarFunction::ArrayPrepend => "ArrayPrepend",
ScalarFunction::ArrayRemove => "ArrayRemove",
ScalarFunction::ArrayReplace => "ArrayReplace",
ScalarFunction::ArrayElement => "ArrayElement",
@@ -2898,7 +2894,6 @@ impl ScalarFunction {
"Sqrt" => Some(Self::Sqrt),
"Tan" => Some(Self::Tan),
"Trunc" => Some(Self::Trunc),
- "Array" => Some(Self::Array),
"BitLength" => Some(Self::BitLength),
"Btrim" => Some(Self::Btrim),
"CharacterLength" => Some(Self::CharacterLength),
@@ -2954,12 +2949,9 @@ impl ScalarFunction {
"Factorial" => Some(Self::Factorial),
"Lcm" => Some(Self::Lcm),
"Gcd" => Some(Self::Gcd),
- "ArrayAppend" => Some(Self::ArrayAppend),
- "ArrayConcat" => Some(Self::ArrayConcat),
"ArrayRepeat" => Some(Self::ArrayRepeat),
"ArrayPosition" => Some(Self::ArrayPosition),
"ArrayPositions" => Some(Self::ArrayPositions),
- "ArrayPrepend" => Some(Self::ArrayPrepend),
"ArrayRemove" => Some(Self::ArrayRemove),
"ArrayReplace" => Some(Self::ArrayReplace),
"ArrayElement" => Some(Self::ArrayElement),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index cc5491b3f2..8dba553b48 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -47,14 +47,13 @@ use datafusion_common::{
use datafusion_expr::expr::Unnest;
use datafusion_expr::window_frame::{check_window_frame,
regularize_window_order_by};
use datafusion_expr::{
- acosh, array, array_append, array_concat, array_distinct, array_element,
- array_except, array_intersect, array_pop_back, array_pop_front,
array_position,
- array_positions, array_prepend, array_remove, array_remove_all,
array_remove_n,
- array_repeat, array_replace, array_replace_all, array_replace_n,
array_resize,
- array_slice, array_sort, array_union, arrow_typeof, ascii, asinh, atan,
atan2, atanh,
- bit_length, btrim, cbrt, ceil, character_length, chr, coalesce,
concat_expr,
- concat_ws_expr, cos, cosh, cot, current_date, current_time, degrees,
digest,
- ends_with, exp,
+ acosh, array_distinct, array_element, array_except, array_intersect,
array_pop_back,
+ array_pop_front, array_position, array_positions, array_remove,
array_remove_all,
+ array_remove_n, array_repeat, array_replace, array_replace_all,
array_replace_n,
+ array_resize, array_slice, array_sort, array_union, arrow_typeof, ascii,
asinh, atan,
+ atan2, atanh, bit_length, btrim, cbrt, ceil, character_length, chr,
coalesce,
+ concat_expr, concat_ws_expr, cos, cosh, cot, current_date, current_time,
degrees,
+ digest, ends_with, exp,
expr::{self, InList, Sort, WindowFunction},
factorial, find_in_set, floor, from_unixtime, gcd, initcap, iszero, lcm,
left,
levenshtein, ln, log, log10, log2,
@@ -477,9 +476,7 @@ impl From<&protobuf::ScalarFunction> for
BuiltinScalarFunction {
ScalarFunction::Trim => Self::Trim,
ScalarFunction::Ltrim => Self::Ltrim,
ScalarFunction::Rtrim => Self::Rtrim,
- ScalarFunction::ArrayAppend => Self::ArrayAppend,
ScalarFunction::ArraySort => Self::ArraySort,
- ScalarFunction::ArrayConcat => Self::ArrayConcat,
ScalarFunction::ArrayExcept => Self::ArrayExcept,
ScalarFunction::ArrayDistinct => Self::ArrayDistinct,
ScalarFunction::ArrayElement => Self::ArrayElement,
@@ -487,7 +484,6 @@ impl From<&protobuf::ScalarFunction> for
BuiltinScalarFunction {
ScalarFunction::ArrayPopBack => Self::ArrayPopBack,
ScalarFunction::ArrayPosition => Self::ArrayPosition,
ScalarFunction::ArrayPositions => Self::ArrayPositions,
- ScalarFunction::ArrayPrepend => Self::ArrayPrepend,
ScalarFunction::ArrayRepeat => Self::ArrayRepeat,
ScalarFunction::ArrayRemove => Self::ArrayRemove,
ScalarFunction::ArrayRemoveN => Self::ArrayRemoveN,
@@ -500,7 +496,6 @@ impl From<&protobuf::ScalarFunction> for
BuiltinScalarFunction {
ScalarFunction::ArrayIntersect => Self::ArrayIntersect,
ScalarFunction::ArrayUnion => Self::ArrayUnion,
ScalarFunction::ArrayResize => Self::ArrayResize,
- ScalarFunction::Array => Self::MakeArray,
ScalarFunction::Md5 => Self::MD5,
ScalarFunction::Sha224 => Self::SHA224,
ScalarFunction::Sha256 => Self::SHA256,
@@ -1409,16 +1404,6 @@ pub fn parse_expr(
ScalarFunction::Acosh => {
Ok(acosh(parse_expr(&args[0], registry, codec)?))
}
- ScalarFunction::Array => Ok(array(
- args.to_owned()
- .iter()
- .map(|expr| parse_expr(expr, registry, codec))
- .collect::<Result<Vec<_>, _>>()?,
- )),
- ScalarFunction::ArrayAppend => Ok(array_append(
- parse_expr(&args[0], registry, codec)?,
- parse_expr(&args[1], registry, codec)?,
- )),
ScalarFunction::ArraySort => Ok(array_sort(
parse_expr(&args[0], registry, codec)?,
parse_expr(&args[1], registry, codec)?,
@@ -1430,16 +1415,6 @@ pub fn parse_expr(
ScalarFunction::ArrayPopBack => {
Ok(array_pop_back(parse_expr(&args[0], registry, codec)?))
}
- ScalarFunction::ArrayPrepend => Ok(array_prepend(
- parse_expr(&args[0], registry, codec)?,
- parse_expr(&args[1], registry, codec)?,
- )),
- ScalarFunction::ArrayConcat => Ok(array_concat(
- args.to_owned()
- .iter()
- .map(|expr| parse_expr(expr, registry, codec))
- .collect::<Result<Vec<_>, _>>()?,
- )),
ScalarFunction::ArrayExcept => Ok(array_except(
parse_expr(&args[0], registry, codec)?,
parse_expr(&args[1], registry, codec)?,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index 77b576dcd3..393cc78267 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -1456,9 +1456,7 @@ impl TryFrom<&BuiltinScalarFunction> for
protobuf::ScalarFunction {
BuiltinScalarFunction::Ltrim => Self::Ltrim,
BuiltinScalarFunction::Rtrim => Self::Rtrim,
BuiltinScalarFunction::ToChar => Self::ToChar,
- BuiltinScalarFunction::ArrayAppend => Self::ArrayAppend,
BuiltinScalarFunction::ArraySort => Self::ArraySort,
- BuiltinScalarFunction::ArrayConcat => Self::ArrayConcat,
BuiltinScalarFunction::ArrayExcept => Self::ArrayExcept,
BuiltinScalarFunction::ArrayDistinct => Self::ArrayDistinct,
BuiltinScalarFunction::ArrayElement => Self::ArrayElement,
@@ -1466,7 +1464,6 @@ impl TryFrom<&BuiltinScalarFunction> for
protobuf::ScalarFunction {
BuiltinScalarFunction::ArrayPopBack => Self::ArrayPopBack,
BuiltinScalarFunction::ArrayPosition => Self::ArrayPosition,
BuiltinScalarFunction::ArrayPositions => Self::ArrayPositions,
- BuiltinScalarFunction::ArrayPrepend => Self::ArrayPrepend,
BuiltinScalarFunction::ArrayRepeat => Self::ArrayRepeat,
BuiltinScalarFunction::ArrayResize => Self::ArrayResize,
BuiltinScalarFunction::ArrayRemove => Self::ArrayRemove,
@@ -1479,7 +1476,6 @@ impl TryFrom<&BuiltinScalarFunction> for
protobuf::ScalarFunction {
BuiltinScalarFunction::ArraySlice => Self::ArraySlice,
BuiltinScalarFunction::ArrayIntersect => Self::ArrayIntersect,
BuiltinScalarFunction::ArrayUnion => Self::ArrayUnion,
- BuiltinScalarFunction::MakeArray => Self::Array,
BuiltinScalarFunction::MD5 => Self::Md5,
BuiltinScalarFunction::SHA224 => Self::Sha224,
BuiltinScalarFunction::SHA256 => Self::Sha256,
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index fb9f296755..76402604ac 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -582,24 +582,31 @@ async fn roundtrip_expr_api() -> Result<()> {
let expr_list = vec![
encode(col("a").cast_to(&DataType::Utf8, &schema)?, lit("hex")),
decode(lit("1234"), lit("hex")),
- array_to_string(array(vec![lit(1), lit(2), lit(3)]), lit(",")),
- array_dims(array(vec![lit(1), lit(2), lit(3)])),
- array_ndims(array(vec![lit(1), lit(2), lit(3)])),
- cardinality(array(vec![lit(1), lit(2), lit(3)])),
+ array_to_string(make_array(vec![lit(1), lit(2), lit(3)]), lit(",")),
+ array_dims(make_array(vec![lit(1), lit(2), lit(3)])),
+ array_ndims(make_array(vec![lit(1), lit(2), lit(3)])),
+ cardinality(make_array(vec![lit(1), lit(2), lit(3)])),
range(lit(1), lit(10), lit(2)),
gen_series(lit(1), lit(10), lit(2)),
- array_has(array(vec![lit(1), lit(2), lit(3)]), lit(1)),
+ array_append(make_array(vec![lit(1), lit(2), lit(3)]), lit(4)),
+ array_prepend(lit(1), make_array(vec![lit(2), lit(3), lit(4)])),
+ array_concat(vec![
+ make_array(vec![lit(1), lit(2)]),
+ make_array(vec![lit(3), lit(4)]),
+ ]),
+ make_array(vec![lit(1), lit(2), lit(3)]),
+ array_has(make_array(vec![lit(1), lit(2), lit(3)]), lit(1)),
array_has_all(
- array(vec![lit(1), lit(2), lit(3)]),
- array(vec![lit(1), lit(2)]),
+ make_array(vec![lit(1), lit(2), lit(3)]),
+ make_array(vec![lit(1), lit(2)]),
),
array_has_any(
- array(vec![lit(1), lit(2), lit(3)]),
- array(vec![lit(1), lit(4)]),
+ make_array(vec![lit(1), lit(2), lit(3)]),
+ make_array(vec![lit(1), lit(4)]),
),
- array_empty(array(vec![lit(1), lit(2), lit(3)])),
- array_length(array(vec![lit(1), lit(2), lit(3)])),
- flatten(array(vec![lit(1), lit(2), lit(3)])),
+ array_empty(make_array(vec![lit(1), lit(2), lit(3)])),
+ array_length(make_array(vec![lit(1), lit(2), lit(3)])),
+ flatten(make_array(vec![lit(1), lit(2), lit(3)])),
];
// ensure expressions created with the expr api can be round tripped
diff --git a/datafusion/sql/src/expr/value.rs b/datafusion/sql/src/expr/value.rs
index 15524b9ffa..8d19b32b8e 100644
--- a/datafusion/sql/src/expr/value.rs
+++ b/datafusion/sql/src/expr/value.rs
@@ -22,9 +22,7 @@ use arrow_schema::DataType;
use datafusion_common::{
not_impl_err, plan_err, DFSchema, DataFusionError, Result, ScalarValue,
};
-use datafusion_expr::expr::ScalarFunction;
-use datafusion_expr::expr::{BinaryExpr, Placeholder};
-use datafusion_expr::BuiltinScalarFunction;
+use datafusion_expr::expr::{BinaryExpr, Placeholder, ScalarFunction};
use datafusion_expr::{lit, Expr, Operator};
use log::debug;
use sqlparser::ast::{BinaryOperator, Expr as SQLExpr, Interval, Value};
@@ -143,10 +141,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
})
.collect::<Result<Vec<_>>>()?;
- Ok(Expr::ScalarFunction(ScalarFunction::new(
- BuiltinScalarFunction::MakeArray,
- values,
- )))
+ if let Some(udf) =
self.context_provider.get_function_meta("make_array") {
+ Ok(Expr::ScalarFunction(ScalarFunction::new_udf(udf, values)))
+ } else {
+ not_impl_err!(
+ "array_expression featrue is disable, So should implement
make_array UDF by yourself"
+ )
+ }
}
/// Convert a SQL interval expression to a DataFusion logical plan