This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 51da92fb9f Provide DataFrame API for `map` and move `map` to 
`functions-array` (#11560)
51da92fb9f is described below

commit 51da92fb9fe1b1bc2344fa78be52c448b36880d9
Author: Jax Liu <[email protected]>
AuthorDate: Mon Jul 22 20:36:58 2024 +0800

    Provide DataFrame API for `map` and move `map` to `functions-array` (#11560)
    
    * move map to `functions-array` and implement dataframe api
    
    * add benchmark for dataframe api
    
    * fix format
    
    * add roundtrip_expr_api test
---
 datafusion/core/Cargo.toml                         |  5 ++
 datafusion/core/benches/map_query_sql.rs           | 93 ++++++++++++++++++++++
 .../core/tests/dataframe/dataframe_functions.rs    | 22 +++++
 datafusion/functions-array/benches/map.rs          | 37 ++++++++-
 datafusion/functions-array/src/lib.rs              |  3 +
 .../src/core => functions-array/src}/map.rs        | 35 +++++---
 datafusion/functions-array/src/planner.rs          |  6 +-
 datafusion/functions/Cargo.toml                    |  5 --
 datafusion/functions/benches/map.rs                | 80 -------------------
 datafusion/functions/src/core/mod.rs               |  7 --
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  5 ++
 11 files changed, 189 insertions(+), 109 deletions(-)

diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index c937a6f6e5..4301396b23 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -217,3 +217,8 @@ name = "topk_aggregate"
 [[bench]]
 harness = false
 name = "parquet_statistic"
+
+[[bench]]
+harness = false
+name = "map_query_sql"
+required-features = ["array_expressions"]
diff --git a/datafusion/core/benches/map_query_sql.rs 
b/datafusion/core/benches/map_query_sql.rs
new file mode 100644
index 0000000000..b6ac8b6b64
--- /dev/null
+++ b/datafusion/core/benches/map_query_sql.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+use parking_lot::Mutex;
+use rand::prelude::ThreadRng;
+use rand::Rng;
+use tokio::runtime::Runtime;
+
+use datafusion::prelude::SessionContext;
+use datafusion_common::ScalarValue;
+use datafusion_expr::Expr;
+use datafusion_functions_array::map::map;
+
+mod data_utils;
+
+fn build_keys(rng: &mut ThreadRng) -> Vec<String> {
+    let mut keys = vec![];
+    for _ in 0..1000 {
+        keys.push(rng.gen_range(0..9999).to_string());
+    }
+    keys
+}
+
+fn build_values(rng: &mut ThreadRng) -> Vec<i32> {
+    let mut values = vec![];
+    for _ in 0..1000 {
+        values.push(rng.gen_range(0..9999));
+    }
+    values
+}
+
+fn t_batch(num: i32) -> RecordBatch {
+    let value: Vec<i32> = (0..num).collect();
+    let c1: ArrayRef = Arc::new(Int32Array::from(value));
+    RecordBatch::try_from_iter(vec![("c1", c1)]).unwrap()
+}
+
+fn create_context(num: i32) -> 
datafusion_common::Result<Arc<Mutex<SessionContext>>> {
+    let ctx = SessionContext::new();
+    ctx.register_batch("t", t_batch(num))?;
+    Ok(Arc::new(Mutex::new(ctx)))
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let ctx = create_context(1).unwrap();
+    let rt = Runtime::new().unwrap();
+    let df = rt.block_on(ctx.lock().table("t")).unwrap();
+
+    let mut rng = rand::thread_rng();
+    let keys = build_keys(&mut rng);
+    let values = build_values(&mut rng);
+    let mut key_buffer = Vec::new();
+    let mut value_buffer = Vec::new();
+
+    for i in 0..1000 {
+        
key_buffer.push(Expr::Literal(ScalarValue::Utf8(Some(keys[i].clone()))));
+        value_buffer.push(Expr::Literal(ScalarValue::Int32(Some(values[i]))));
+    }
+    c.bench_function("map_1000_1", |b| {
+        b.iter(|| {
+            black_box(
+                rt.block_on(
+                    df.clone()
+                        .select(vec![map(key_buffer.clone(), 
value_buffer.clone())])
+                        .unwrap()
+                        .collect(),
+                )
+                .unwrap(),
+            );
+        });
+    });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs 
b/datafusion/core/tests/dataframe/dataframe_functions.rs
index 1c55c48fea..f7b02196d8 100644
--- a/datafusion/core/tests/dataframe/dataframe_functions.rs
+++ b/datafusion/core/tests/dataframe/dataframe_functions.rs
@@ -34,6 +34,7 @@ use datafusion_common::{DFSchema, ScalarValue};
 use datafusion_expr::expr::Alias;
 use datafusion_expr::ExprSchemable;
 use datafusion_functions_aggregate::expr_fn::{approx_median, 
approx_percentile_cont};
+use datafusion_functions_array::map::map;
 
 fn test_schema() -> SchemaRef {
     Arc::new(Schema::new(vec![
@@ -1087,3 +1088,24 @@ async fn test_fn_array_to_string() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn test_fn_map() -> Result<()> {
+    let expr = map(
+        vec![lit("a"), lit("b"), lit("c")],
+        vec![lit(1), lit(2), lit(3)],
+    );
+    let expected = [
+        
"+---------------------------------------------------------------------------------------+",
+        "| 
map(make_array(Utf8(\"a\"),Utf8(\"b\"),Utf8(\"c\")),make_array(Int32(1),Int32(2),Int32(3)))
 |",
+        
"+---------------------------------------------------------------------------------------+",
+        "| {a: 1, b: 2, c: 3}                                                  
                  |",
+        "| {a: 1, b: 2, c: 3}                                                  
                  |",
+        "| {a: 1, b: 2, c: 3}                                                  
                  |",
+        "| {a: 1, b: 2, c: 3}                                                  
                  |",
+        
"+---------------------------------------------------------------------------------------+",
+    ];
+    assert_fn_batches!(expr, expected);
+
+    Ok(())
+}
diff --git a/datafusion/functions-array/benches/map.rs 
b/datafusion/functions-array/benches/map.rs
index 2e9b45266a..c2e0e641e8 100644
--- a/datafusion/functions-array/benches/map.rs
+++ b/datafusion/functions-array/benches/map.rs
@@ -17,13 +17,18 @@
 
 extern crate criterion;
 
+use arrow_array::{Int32Array, ListArray, StringArray};
+use arrow_buffer::{OffsetBuffer, ScalarBuffer};
+use arrow_schema::{DataType, Field};
 use criterion::{black_box, criterion_group, criterion_main, Criterion};
 use rand::prelude::ThreadRng;
 use rand::Rng;
+use std::sync::Arc;
 
 use datafusion_common::ScalarValue;
 use datafusion_expr::planner::ExprPlanner;
-use datafusion_expr::Expr;
+use datafusion_expr::{ColumnarValue, Expr};
+use datafusion_functions_array::map::map_udf;
 use datafusion_functions_array::planner::ArrayFunctionPlanner;
 
 fn keys(rng: &mut ThreadRng) -> Vec<String> {
@@ -63,6 +68,36 @@ fn criterion_benchmark(c: &mut Criterion) {
             );
         });
     });
+
+    c.bench_function("map_1000", |b| {
+        let mut rng = rand::thread_rng();
+        let field = Arc::new(Field::new("item", DataType::Utf8, true));
+        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
+        let key_list = ListArray::new(
+            field,
+            offsets,
+            Arc::new(StringArray::from(keys(&mut rng))),
+            None,
+        );
+        let field = Arc::new(Field::new("item", DataType::Int32, true));
+        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
+        let value_list = ListArray::new(
+            field,
+            offsets,
+            Arc::new(Int32Array::from(values(&mut rng))),
+            None,
+        );
+        let keys = 
ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list)));
+        let values = 
ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list)));
+
+        b.iter(|| {
+            black_box(
+                map_udf()
+                    .invoke(&[keys.clone(), values.clone()])
+                    .expect("map should work on valid values"),
+            );
+        });
+    });
 }
 
 criterion_group!(benches, criterion_benchmark);
diff --git a/datafusion/functions-array/src/lib.rs 
b/datafusion/functions-array/src/lib.rs
index 9717d29883..f68f59dcd6 100644
--- a/datafusion/functions-array/src/lib.rs
+++ b/datafusion/functions-array/src/lib.rs
@@ -41,6 +41,7 @@ pub mod extract;
 pub mod flatten;
 pub mod length;
 pub mod make_array;
+pub mod map;
 pub mod planner;
 pub mod position;
 pub mod range;
@@ -53,6 +54,7 @@ pub mod set_ops;
 pub mod sort;
 pub mod string;
 pub mod utils;
+
 use datafusion_common::Result;
 use datafusion_execution::FunctionRegistry;
 use datafusion_expr::ScalarUDF;
@@ -140,6 +142,7 @@ pub fn all_default_array_functions() -> Vec<Arc<ScalarUDF>> 
{
         replace::array_replace_n_udf(),
         replace::array_replace_all_udf(),
         replace::array_replace_udf(),
+        map::map_udf(),
     ]
 }
 
diff --git a/datafusion/functions/src/core/map.rs 
b/datafusion/functions-array/src/map.rs
similarity index 83%
rename from datafusion/functions/src/core/map.rs
rename to datafusion/functions-array/src/map.rs
index 2deef242f8..e218b501dc 100644
--- a/datafusion/functions/src/core/map.rs
+++ b/datafusion/functions-array/src/map.rs
@@ -15,17 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::make_array::make_array;
+use arrow::array::ArrayData;
+use arrow_array::{Array, ArrayRef, MapArray, StructArray};
+use arrow_buffer::{Buffer, ToByteSlice};
+use arrow_schema::{DataType, Field, SchemaBuilder};
+use datafusion_common::{exec_err, ScalarValue};
+use datafusion_expr::expr::ScalarFunction;
+use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, 
Volatility};
 use std::any::Any;
 use std::collections::VecDeque;
 use std::sync::Arc;
 
-use arrow::array::{Array, ArrayData, ArrayRef, MapArray, StructArray};
-use arrow::datatypes::{DataType, Field, SchemaBuilder};
-use arrow_buffer::{Buffer, ToByteSlice};
+/// Returns a map created from a key list and a value list
+pub fn map(keys: Vec<Expr>, values: Vec<Expr>) -> Expr {
+    let keys = make_array(keys);
+    let values = make_array(values);
+    Expr::ScalarFunction(ScalarFunction::new_udf(map_udf(), vec![keys, 
values]))
+}
 
-use datafusion_common::Result;
-use datafusion_common::{exec_err, ScalarValue};
-use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+create_func!(MapFunc, map_udf);
 
 /// Check if we can evaluate the expr to constant directly.
 ///
@@ -39,7 +48,7 @@ fn can_evaluate_to_const(args: &[ColumnarValue]) -> bool {
         .all(|arg| matches!(arg, ColumnarValue::Scalar(_)))
 }
 
-fn make_map_batch(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+fn make_map_batch(args: &[ColumnarValue]) -> 
datafusion_common::Result<ColumnarValue> {
     if args.len() != 2 {
         return exec_err!(
             "make_map requires exactly 2 arguments, got {} instead",
@@ -54,7 +63,9 @@ fn make_map_batch(args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
     make_map_batch_internal(key, value, can_evaluate_to_const)
 }
 
-fn get_first_array_ref(columnar_value: &ColumnarValue) -> Result<ArrayRef> {
+fn get_first_array_ref(
+    columnar_value: &ColumnarValue,
+) -> datafusion_common::Result<ArrayRef> {
     match columnar_value {
         ColumnarValue::Scalar(value) => match value {
             ScalarValue::List(array) => Ok(array.value(0)),
@@ -70,7 +81,7 @@ fn make_map_batch_internal(
     keys: ArrayRef,
     values: ArrayRef,
     can_evaluate_to_const: bool,
-) -> Result<ColumnarValue> {
+) -> datafusion_common::Result<ColumnarValue> {
     if keys.null_count() > 0 {
         return exec_err!("map key cannot be null");
     }
@@ -150,7 +161,7 @@ impl ScalarUDFImpl for MapFunc {
         &self.signature
     }
 
-    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+    fn return_type(&self, arg_types: &[DataType]) -> 
datafusion_common::Result<DataType> {
         if arg_types.len() % 2 != 0 {
             return exec_err!(
                 "map requires an even number of arguments, got {} instead",
@@ -175,12 +186,12 @@ impl ScalarUDFImpl for MapFunc {
         ))
     }
 
-    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    fn invoke(&self, args: &[ColumnarValue]) -> 
datafusion_common::Result<ColumnarValue> {
         make_map_batch(args)
     }
 }
 
-fn get_element_type(data_type: &DataType) -> Result<&DataType> {
+fn get_element_type(data_type: &DataType) -> 
datafusion_common::Result<&DataType> {
     match data_type {
         DataType::List(element) => Ok(element.data_type()),
         DataType::LargeList(element) => Ok(element.data_type()),
diff --git a/datafusion/functions-array/src/planner.rs 
b/datafusion/functions-array/src/planner.rs
index fbb541d9b1..c63c2c83e6 100644
--- a/datafusion/functions-array/src/planner.rs
+++ b/datafusion/functions-array/src/planner.rs
@@ -27,6 +27,7 @@ use datafusion_expr::{
 use datafusion_functions::expr_fn::get_field;
 use datafusion_functions_aggregate::nth_value::nth_value_udaf;
 
+use crate::map::map_udf;
 use crate::{
     array_has::array_has_all,
     expr_fn::{array_append, array_concat, array_prepend},
@@ -111,10 +112,7 @@ impl ExprPlanner for ArrayFunctionPlanner {
         let values = make_array(values.into_iter().map(|(_, e)| e).collect());
 
         Ok(PlannerResult::Planned(Expr::ScalarFunction(
-            ScalarFunction::new_udf(
-                datafusion_functions::core::map(),
-                vec![keys, values],
-            ),
+            ScalarFunction::new_udf(map_udf(), vec![keys, values]),
         )))
     }
 }
diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml
index b143080b19..0281676cab 100644
--- a/datafusion/functions/Cargo.toml
+++ b/datafusion/functions/Cargo.toml
@@ -141,8 +141,3 @@ required-features = ["string_expressions"]
 harness = false
 name = "upper"
 required-features = ["string_expressions"]
-
-[[bench]]
-harness = false
-name = "map"
-required-features = ["core_expressions"]
diff --git a/datafusion/functions/benches/map.rs 
b/datafusion/functions/benches/map.rs
deleted file mode 100644
index 811c21a41b..0000000000
--- a/datafusion/functions/benches/map.rs
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-extern crate criterion;
-
-use arrow::array::{Int32Array, ListArray, StringArray};
-use arrow::datatypes::{DataType, Field};
-use arrow_buffer::{OffsetBuffer, ScalarBuffer};
-use criterion::{black_box, criterion_group, criterion_main, Criterion};
-use datafusion_common::ScalarValue;
-use datafusion_expr::ColumnarValue;
-use datafusion_functions::core::map;
-use rand::prelude::ThreadRng;
-use rand::Rng;
-use std::sync::Arc;
-
-fn keys(rng: &mut ThreadRng) -> Vec<String> {
-    let mut keys = vec![];
-    for _ in 0..1000 {
-        keys.push(rng.gen_range(0..9999).to_string());
-    }
-    keys
-}
-
-fn values(rng: &mut ThreadRng) -> Vec<i32> {
-    let mut values = vec![];
-    for _ in 0..1000 {
-        values.push(rng.gen_range(0..9999));
-    }
-    values
-}
-
-fn criterion_benchmark(c: &mut Criterion) {
-    c.bench_function("map_1000", |b| {
-        let mut rng = rand::thread_rng();
-        let field = Arc::new(Field::new("item", DataType::Utf8, true));
-        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
-        let key_list = ListArray::new(
-            field,
-            offsets,
-            Arc::new(StringArray::from(keys(&mut rng))),
-            None,
-        );
-        let field = Arc::new(Field::new("item", DataType::Int32, true));
-        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
-        let value_list = ListArray::new(
-            field,
-            offsets,
-            Arc::new(Int32Array::from(values(&mut rng))),
-            None,
-        );
-        let keys = 
ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list)));
-        let values = 
ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list)));
-
-        b.iter(|| {
-            black_box(
-                map()
-                    .invoke(&[keys.clone(), values.clone()])
-                    .expect("map should work on valid values"),
-            );
-        });
-    });
-}
-
-criterion_group!(benches, criterion_benchmark);
-criterion_main!(benches);
diff --git a/datafusion/functions/src/core/mod.rs 
b/datafusion/functions/src/core/mod.rs
index ee0309e593..8c51213972 100644
--- a/datafusion/functions/src/core/mod.rs
+++ b/datafusion/functions/src/core/mod.rs
@@ -25,7 +25,6 @@ pub mod arrowtypeof;
 pub mod coalesce;
 pub mod expr_ext;
 pub mod getfield;
-pub mod map;
 pub mod named_struct;
 pub mod nullif;
 pub mod nvl;
@@ -43,7 +42,6 @@ make_udf_function!(r#struct::StructFunc, STRUCT, r#struct);
 make_udf_function!(named_struct::NamedStructFunc, NAMED_STRUCT, named_struct);
 make_udf_function!(getfield::GetFieldFunc, GET_FIELD, get_field);
 make_udf_function!(coalesce::CoalesceFunc, COALESCE, coalesce);
-make_udf_function!(map::MapFunc, MAP, map);
 
 pub mod expr_fn {
     use datafusion_expr::{Expr, Literal};
@@ -80,10 +78,6 @@ pub mod expr_fn {
         coalesce,
         "Returns `coalesce(args...)`, which evaluates to the value of the 
first expr which is not NULL",
         args,
-    ),(
-        map,
-        "Returns a map created from a key list and a value list",
-        args,
     ));
 
     #[doc = "Returns the value of the field with the given name from the 
struct"]
@@ -101,6 +95,5 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
         arrow_typeof(),
         named_struct(),
         coalesce(),
-        map(),
     ]
 }
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 11945f3958..3476d5d042 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -44,6 +44,7 @@ use datafusion::functions_aggregate::expr_fn::{
     count_distinct, covar_pop, covar_samp, first_value, grouping, median, 
stddev,
     stddev_pop, sum, var_pop, var_sample,
 };
+use datafusion::functions_array::map::map;
 use datafusion::prelude::*;
 use datafusion::test_util::{TestTableFactory, TestTableProvider};
 use datafusion_common::config::TableOptions;
@@ -704,6 +705,10 @@ async fn roundtrip_expr_api() -> Result<()> {
         bool_or(lit(true)),
         array_agg(lit(1)),
         array_agg(lit(1)).distinct().build().unwrap(),
+        map(
+            vec![lit(1), lit(2), lit(3)],
+            vec![lit(10), lit(20), lit(30)],
+        ),
     ];
 
     // ensure expressions created with the expr api can be round tripped


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to