This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 51da92fb9f Provide DataFrame API for `map` and move `map` to
`functions-array` (#11560)
51da92fb9f is described below
commit 51da92fb9fe1b1bc2344fa78be52c448b36880d9
Author: Jax Liu <[email protected]>
AuthorDate: Mon Jul 22 20:36:58 2024 +0800
Provide DataFrame API for `map` and move `map` to `functions-array` (#11560)
* move map to `functions-array` and implement dataframe api
* add benchmark for dataframe api
* fix format
* add roundtrip_expr_api test
---
datafusion/core/Cargo.toml | 5 ++
datafusion/core/benches/map_query_sql.rs | 93 ++++++++++++++++++++++
.../core/tests/dataframe/dataframe_functions.rs | 22 +++++
datafusion/functions-array/benches/map.rs | 37 ++++++++-
datafusion/functions-array/src/lib.rs | 3 +
.../src/core => functions-array/src}/map.rs | 35 +++++---
datafusion/functions-array/src/planner.rs | 6 +-
datafusion/functions/Cargo.toml | 5 --
datafusion/functions/benches/map.rs | 80 -------------------
datafusion/functions/src/core/mod.rs | 7 --
.../proto/tests/cases/roundtrip_logical_plan.rs | 5 ++
11 files changed, 189 insertions(+), 109 deletions(-)
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index c937a6f6e5..4301396b23 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -217,3 +217,8 @@ name = "topk_aggregate"
[[bench]]
harness = false
name = "parquet_statistic"
+
+[[bench]]
+harness = false
+name = "map_query_sql"
+required-features = ["array_expressions"]
diff --git a/datafusion/core/benches/map_query_sql.rs
b/datafusion/core/benches/map_query_sql.rs
new file mode 100644
index 0000000000..b6ac8b6b64
--- /dev/null
+++ b/datafusion/core/benches/map_query_sql.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+use parking_lot::Mutex;
+use rand::prelude::ThreadRng;
+use rand::Rng;
+use tokio::runtime::Runtime;
+
+use datafusion::prelude::SessionContext;
+use datafusion_common::ScalarValue;
+use datafusion_expr::Expr;
+use datafusion_functions_array::map::map;
+
+mod data_utils;
+
+fn build_keys(rng: &mut ThreadRng) -> Vec<String> {
+ let mut keys = vec![];
+ for _ in 0..1000 {
+ keys.push(rng.gen_range(0..9999).to_string());
+ }
+ keys
+}
+
+fn build_values(rng: &mut ThreadRng) -> Vec<i32> {
+ let mut values = vec![];
+ for _ in 0..1000 {
+ values.push(rng.gen_range(0..9999));
+ }
+ values
+}
+
+fn t_batch(num: i32) -> RecordBatch {
+ let value: Vec<i32> = (0..num).collect();
+ let c1: ArrayRef = Arc::new(Int32Array::from(value));
+ RecordBatch::try_from_iter(vec![("c1", c1)]).unwrap()
+}
+
+fn create_context(num: i32) ->
datafusion_common::Result<Arc<Mutex<SessionContext>>> {
+ let ctx = SessionContext::new();
+ ctx.register_batch("t", t_batch(num))?;
+ Ok(Arc::new(Mutex::new(ctx)))
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let ctx = create_context(1).unwrap();
+ let rt = Runtime::new().unwrap();
+ let df = rt.block_on(ctx.lock().table("t")).unwrap();
+
+ let mut rng = rand::thread_rng();
+ let keys = build_keys(&mut rng);
+ let values = build_values(&mut rng);
+ let mut key_buffer = Vec::new();
+ let mut value_buffer = Vec::new();
+
+ for i in 0..1000 {
+
key_buffer.push(Expr::Literal(ScalarValue::Utf8(Some(keys[i].clone()))));
+ value_buffer.push(Expr::Literal(ScalarValue::Int32(Some(values[i]))));
+ }
+ c.bench_function("map_1000_1", |b| {
+ b.iter(|| {
+ black_box(
+ rt.block_on(
+ df.clone()
+ .select(vec![map(key_buffer.clone(),
value_buffer.clone())])
+ .unwrap()
+ .collect(),
+ )
+ .unwrap(),
+ );
+ });
+ });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs
b/datafusion/core/tests/dataframe/dataframe_functions.rs
index 1c55c48fea..f7b02196d8 100644
--- a/datafusion/core/tests/dataframe/dataframe_functions.rs
+++ b/datafusion/core/tests/dataframe/dataframe_functions.rs
@@ -34,6 +34,7 @@ use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::expr::Alias;
use datafusion_expr::ExprSchemable;
use datafusion_functions_aggregate::expr_fn::{approx_median,
approx_percentile_cont};
+use datafusion_functions_array::map::map;
fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
@@ -1087,3 +1088,24 @@ async fn test_fn_array_to_string() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn test_fn_map() -> Result<()> {
+ let expr = map(
+ vec![lit("a"), lit("b"), lit("c")],
+ vec![lit(1), lit(2), lit(3)],
+ );
+ let expected = [
+
"+---------------------------------------------------------------------------------------+",
+ "|
map(make_array(Utf8(\"a\"),Utf8(\"b\"),Utf8(\"c\")),make_array(Int32(1),Int32(2),Int32(3)))
|",
+
"+---------------------------------------------------------------------------------------+",
+ "| {a: 1, b: 2, c: 3}
|",
+ "| {a: 1, b: 2, c: 3}
|",
+ "| {a: 1, b: 2, c: 3}
|",
+ "| {a: 1, b: 2, c: 3}
|",
+
"+---------------------------------------------------------------------------------------+",
+ ];
+ assert_fn_batches!(expr, expected);
+
+ Ok(())
+}
diff --git a/datafusion/functions-array/benches/map.rs
b/datafusion/functions-array/benches/map.rs
index 2e9b45266a..c2e0e641e8 100644
--- a/datafusion/functions-array/benches/map.rs
+++ b/datafusion/functions-array/benches/map.rs
@@ -17,13 +17,18 @@
extern crate criterion;
+use arrow_array::{Int32Array, ListArray, StringArray};
+use arrow_buffer::{OffsetBuffer, ScalarBuffer};
+use arrow_schema::{DataType, Field};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use rand::prelude::ThreadRng;
use rand::Rng;
+use std::sync::Arc;
use datafusion_common::ScalarValue;
use datafusion_expr::planner::ExprPlanner;
-use datafusion_expr::Expr;
+use datafusion_expr::{ColumnarValue, Expr};
+use datafusion_functions_array::map::map_udf;
use datafusion_functions_array::planner::ArrayFunctionPlanner;
fn keys(rng: &mut ThreadRng) -> Vec<String> {
@@ -63,6 +68,36 @@ fn criterion_benchmark(c: &mut Criterion) {
);
});
});
+
+ c.bench_function("map_1000", |b| {
+ let mut rng = rand::thread_rng();
+ let field = Arc::new(Field::new("item", DataType::Utf8, true));
+ let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
+ let key_list = ListArray::new(
+ field,
+ offsets,
+ Arc::new(StringArray::from(keys(&mut rng))),
+ None,
+ );
+ let field = Arc::new(Field::new("item", DataType::Int32, true));
+ let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
+ let value_list = ListArray::new(
+ field,
+ offsets,
+ Arc::new(Int32Array::from(values(&mut rng))),
+ None,
+ );
+ let keys =
ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list)));
+ let values =
ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list)));
+
+ b.iter(|| {
+ black_box(
+ map_udf()
+ .invoke(&[keys.clone(), values.clone()])
+ .expect("map should work on valid values"),
+ );
+ });
+ });
}
criterion_group!(benches, criterion_benchmark);
diff --git a/datafusion/functions-array/src/lib.rs
b/datafusion/functions-array/src/lib.rs
index 9717d29883..f68f59dcd6 100644
--- a/datafusion/functions-array/src/lib.rs
+++ b/datafusion/functions-array/src/lib.rs
@@ -41,6 +41,7 @@ pub mod extract;
pub mod flatten;
pub mod length;
pub mod make_array;
+pub mod map;
pub mod planner;
pub mod position;
pub mod range;
@@ -53,6 +54,7 @@ pub mod set_ops;
pub mod sort;
pub mod string;
pub mod utils;
+
use datafusion_common::Result;
use datafusion_execution::FunctionRegistry;
use datafusion_expr::ScalarUDF;
@@ -140,6 +142,7 @@ pub fn all_default_array_functions() -> Vec<Arc<ScalarUDF>>
{
replace::array_replace_n_udf(),
replace::array_replace_all_udf(),
replace::array_replace_udf(),
+ map::map_udf(),
]
}
diff --git a/datafusion/functions/src/core/map.rs
b/datafusion/functions-array/src/map.rs
similarity index 83%
rename from datafusion/functions/src/core/map.rs
rename to datafusion/functions-array/src/map.rs
index 2deef242f8..e218b501dc 100644
--- a/datafusion/functions/src/core/map.rs
+++ b/datafusion/functions-array/src/map.rs
@@ -15,17 +15,26 @@
// specific language governing permissions and limitations
// under the License.
+use crate::make_array::make_array;
+use arrow::array::ArrayData;
+use arrow_array::{Array, ArrayRef, MapArray, StructArray};
+use arrow_buffer::{Buffer, ToByteSlice};
+use arrow_schema::{DataType, Field, SchemaBuilder};
+use datafusion_common::{exec_err, ScalarValue};
+use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature,
Volatility};
use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;
-use arrow::array::{Array, ArrayData, ArrayRef, MapArray, StructArray};
-use arrow::datatypes::{DataType, Field, SchemaBuilder};
-use arrow_buffer::{Buffer, ToByteSlice};
+/// Returns a map created from a key list and a value list
+pub fn map(keys: Vec<Expr>, values: Vec<Expr>) -> Expr {
+ let keys = make_array(keys);
+ let values = make_array(values);
+ Expr::ScalarFunction(ScalarFunction::new_udf(map_udf(), vec![keys,
values]))
+}
-use datafusion_common::Result;
-use datafusion_common::{exec_err, ScalarValue};
-use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+create_func!(MapFunc, map_udf);
/// Check if we can evaluate the expr to constant directly.
///
@@ -39,7 +48,7 @@ fn can_evaluate_to_const(args: &[ColumnarValue]) -> bool {
.all(|arg| matches!(arg, ColumnarValue::Scalar(_)))
}
-fn make_map_batch(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+fn make_map_batch(args: &[ColumnarValue]) ->
datafusion_common::Result<ColumnarValue> {
if args.len() != 2 {
return exec_err!(
"make_map requires exactly 2 arguments, got {} instead",
@@ -54,7 +63,9 @@ fn make_map_batch(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
make_map_batch_internal(key, value, can_evaluate_to_const)
}
-fn get_first_array_ref(columnar_value: &ColumnarValue) -> Result<ArrayRef> {
+fn get_first_array_ref(
+ columnar_value: &ColumnarValue,
+) -> datafusion_common::Result<ArrayRef> {
match columnar_value {
ColumnarValue::Scalar(value) => match value {
ScalarValue::List(array) => Ok(array.value(0)),
@@ -70,7 +81,7 @@ fn make_map_batch_internal(
keys: ArrayRef,
values: ArrayRef,
can_evaluate_to_const: bool,
-) -> Result<ColumnarValue> {
+) -> datafusion_common::Result<ColumnarValue> {
if keys.null_count() > 0 {
return exec_err!("map key cannot be null");
}
@@ -150,7 +161,7 @@ impl ScalarUDFImpl for MapFunc {
&self.signature
}
- fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ fn return_type(&self, arg_types: &[DataType]) ->
datafusion_common::Result<DataType> {
if arg_types.len() % 2 != 0 {
return exec_err!(
"map requires an even number of arguments, got {} instead",
@@ -175,12 +186,12 @@ impl ScalarUDFImpl for MapFunc {
))
}
- fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ fn invoke(&self, args: &[ColumnarValue]) ->
datafusion_common::Result<ColumnarValue> {
make_map_batch(args)
}
}
-fn get_element_type(data_type: &DataType) -> Result<&DataType> {
+fn get_element_type(data_type: &DataType) ->
datafusion_common::Result<&DataType> {
match data_type {
DataType::List(element) => Ok(element.data_type()),
DataType::LargeList(element) => Ok(element.data_type()),
diff --git a/datafusion/functions-array/src/planner.rs
b/datafusion/functions-array/src/planner.rs
index fbb541d9b1..c63c2c83e6 100644
--- a/datafusion/functions-array/src/planner.rs
+++ b/datafusion/functions-array/src/planner.rs
@@ -27,6 +27,7 @@ use datafusion_expr::{
use datafusion_functions::expr_fn::get_field;
use datafusion_functions_aggregate::nth_value::nth_value_udaf;
+use crate::map::map_udf;
use crate::{
array_has::array_has_all,
expr_fn::{array_append, array_concat, array_prepend},
@@ -111,10 +112,7 @@ impl ExprPlanner for ArrayFunctionPlanner {
let values = make_array(values.into_iter().map(|(_, e)| e).collect());
Ok(PlannerResult::Planned(Expr::ScalarFunction(
- ScalarFunction::new_udf(
- datafusion_functions::core::map(),
- vec![keys, values],
- ),
+ ScalarFunction::new_udf(map_udf(), vec![keys, values]),
)))
}
}
diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml
index b143080b19..0281676cab 100644
--- a/datafusion/functions/Cargo.toml
+++ b/datafusion/functions/Cargo.toml
@@ -141,8 +141,3 @@ required-features = ["string_expressions"]
harness = false
name = "upper"
required-features = ["string_expressions"]
-
-[[bench]]
-harness = false
-name = "map"
-required-features = ["core_expressions"]
diff --git a/datafusion/functions/benches/map.rs
b/datafusion/functions/benches/map.rs
deleted file mode 100644
index 811c21a41b..0000000000
--- a/datafusion/functions/benches/map.rs
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-extern crate criterion;
-
-use arrow::array::{Int32Array, ListArray, StringArray};
-use arrow::datatypes::{DataType, Field};
-use arrow_buffer::{OffsetBuffer, ScalarBuffer};
-use criterion::{black_box, criterion_group, criterion_main, Criterion};
-use datafusion_common::ScalarValue;
-use datafusion_expr::ColumnarValue;
-use datafusion_functions::core::map;
-use rand::prelude::ThreadRng;
-use rand::Rng;
-use std::sync::Arc;
-
-fn keys(rng: &mut ThreadRng) -> Vec<String> {
- let mut keys = vec![];
- for _ in 0..1000 {
- keys.push(rng.gen_range(0..9999).to_string());
- }
- keys
-}
-
-fn values(rng: &mut ThreadRng) -> Vec<i32> {
- let mut values = vec![];
- for _ in 0..1000 {
- values.push(rng.gen_range(0..9999));
- }
- values
-}
-
-fn criterion_benchmark(c: &mut Criterion) {
- c.bench_function("map_1000", |b| {
- let mut rng = rand::thread_rng();
- let field = Arc::new(Field::new("item", DataType::Utf8, true));
- let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
- let key_list = ListArray::new(
- field,
- offsets,
- Arc::new(StringArray::from(keys(&mut rng))),
- None,
- );
- let field = Arc::new(Field::new("item", DataType::Int32, true));
- let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
- let value_list = ListArray::new(
- field,
- offsets,
- Arc::new(Int32Array::from(values(&mut rng))),
- None,
- );
- let keys =
ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list)));
- let values =
ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list)));
-
- b.iter(|| {
- black_box(
- map()
- .invoke(&[keys.clone(), values.clone()])
- .expect("map should work on valid values"),
- );
- });
- });
-}
-
-criterion_group!(benches, criterion_benchmark);
-criterion_main!(benches);
diff --git a/datafusion/functions/src/core/mod.rs
b/datafusion/functions/src/core/mod.rs
index ee0309e593..8c51213972 100644
--- a/datafusion/functions/src/core/mod.rs
+++ b/datafusion/functions/src/core/mod.rs
@@ -25,7 +25,6 @@ pub mod arrowtypeof;
pub mod coalesce;
pub mod expr_ext;
pub mod getfield;
-pub mod map;
pub mod named_struct;
pub mod nullif;
pub mod nvl;
@@ -43,7 +42,6 @@ make_udf_function!(r#struct::StructFunc, STRUCT, r#struct);
make_udf_function!(named_struct::NamedStructFunc, NAMED_STRUCT, named_struct);
make_udf_function!(getfield::GetFieldFunc, GET_FIELD, get_field);
make_udf_function!(coalesce::CoalesceFunc, COALESCE, coalesce);
-make_udf_function!(map::MapFunc, MAP, map);
pub mod expr_fn {
use datafusion_expr::{Expr, Literal};
@@ -80,10 +78,6 @@ pub mod expr_fn {
coalesce,
"Returns `coalesce(args...)`, which evaluates to the value of the
first expr which is not NULL",
args,
- ),(
- map,
- "Returns a map created from a key list and a value list",
- args,
));
#[doc = "Returns the value of the field with the given name from the
struct"]
@@ -101,6 +95,5 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
arrow_typeof(),
named_struct(),
coalesce(),
- map(),
]
}
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 11945f3958..3476d5d042 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -44,6 +44,7 @@ use datafusion::functions_aggregate::expr_fn::{
count_distinct, covar_pop, covar_samp, first_value, grouping, median,
stddev,
stddev_pop, sum, var_pop, var_sample,
};
+use datafusion::functions_array::map::map;
use datafusion::prelude::*;
use datafusion::test_util::{TestTableFactory, TestTableProvider};
use datafusion_common::config::TableOptions;
@@ -704,6 +705,10 @@ async fn roundtrip_expr_api() -> Result<()> {
bool_or(lit(true)),
array_agg(lit(1)),
array_agg(lit(1)).distinct().build().unwrap(),
+ map(
+ vec![lit(1), lit(2), lit(3)],
+ vec![lit(10), lit(20), lit(30)],
+ ),
];
// ensure expressions created with the expr api can be round tripped
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]