This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2d947b31d0 feat(spark): implement Spark `map` function
`map_from_entries` (#17779)
2d947b31d0 is described below
commit 2d947b31d0355710dc179d1d72ca5366b7721b2a
Author: Evgenii Glotov <[email protected]>
AuthorDate: Mon Sep 29 06:00:55 2025 +0300
feat(spark): implement Spark `map` function `map_from_entries` (#17779)
* feat(spark): implement Spark `map` function `map_from_entries`
* fix: map_from_entries with null entries in lists, chore: refactor initial
offsets, add tests
---
.../spark/src/function/map/map_from_entries.rs | 133 +++++++++++++++++
datafusion/spark/src/function/map/mod.rs | 10 +-
datafusion/spark/src/function/map/utils.rs | 65 ++++----
.../test_files/spark/map/map_from_entries.slt | 164 +++++++++++++++++++++
4 files changed, 343 insertions(+), 29 deletions(-)
diff --git a/datafusion/spark/src/function/map/map_from_entries.rs
b/datafusion/spark/src/function/map/map_from_entries.rs
new file mode 100644
index 0000000000..6648979c5d
--- /dev/null
+++ b/datafusion/spark/src/function/map/map_from_entries.rs
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+
+use crate::function::map::utils::{
+ get_element_type, get_list_offsets, get_list_values,
+ map_from_keys_values_offsets_nulls, map_type_from_key_value_types,
+};
+use arrow::array::{Array, ArrayRef, NullBufferBuilder, StructArray};
+use arrow::buffer::NullBuffer;
+use arrow::datatypes::DataType;
+use datafusion_common::utils::take_function_args;
+use datafusion_common::{exec_err, Result};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+use datafusion_functions::utils::make_scalar_function;
+
+/// Spark-compatible `map_from_entries` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#map_from_entries>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct MapFromEntries {
+ signature: Signature,
+}
+
+impl Default for MapFromEntries {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl MapFromEntries {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::array(Volatility::Immutable),
+ }
+ }
+}
+
+impl ScalarUDFImpl for MapFromEntries {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "map_from_entries"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ let [entries_type] = take_function_args("map_from_entries",
arg_types)?;
+ let entries_element_type = get_element_type(entries_type)?;
+ let (keys_type, values_type) = match entries_element_type {
+ DataType::Struct(fields) if fields.len() == 2 => {
+ Ok((fields[0].data_type(), fields[1].data_type()))
+ }
+ wrong_type => exec_err!(
+ "map_from_entries: expected array<struct<key, value>>, got
{:?}",
+ wrong_type
+ ),
+ }?;
+ Ok(map_type_from_key_value_types(keys_type, values_type))
+ }
+
+ fn invoke_with_args(
+ &self,
+ args: datafusion_expr::ScalarFunctionArgs,
+ ) -> Result<ColumnarValue> {
+ make_scalar_function(map_from_entries_inner, vec![])(&args.args)
+ }
+}
+
+fn map_from_entries_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+ let [entries] = take_function_args("map_from_entries", args)?;
+ let entries_offsets = get_list_offsets(entries)?;
+ let entries_values = get_list_values(entries)?;
+
+ let (flat_keys, flat_values) =
+ match entries_values.as_any().downcast_ref::<StructArray>() {
+ Some(a) => Ok((a.column(0), a.column(1))),
+ None => exec_err!(
+ "map_from_entries: expected array<struct<key, value>>, got
{:?}",
+ entries_values.data_type()
+ ),
+ }?;
+
+ let entries_with_nulls =
entries_values.nulls().and_then(|entries_inner_nulls| {
+ let mut builder = NullBufferBuilder::new_with_len(0);
+ let mut cur_offset = entries_offsets
+ .first()
+ .map(|offset| *offset as usize)
+ .unwrap_or(0);
+
+ for next_offset in entries_offsets.iter().skip(1) {
+ let num_entries = *next_offset as usize - cur_offset;
+ builder.append(
+ entries_inner_nulls
+ .slice(cur_offset, num_entries)
+ .null_count()
+ == 0,
+ );
+ cur_offset = *next_offset as usize;
+ }
+ builder.finish()
+ });
+
+ let res_nulls = NullBuffer::union(entries.nulls(),
entries_with_nulls.as_ref());
+
+ map_from_keys_values_offsets_nulls(
+ flat_keys,
+ flat_values,
+ &entries_offsets,
+ &entries_offsets,
+ None,
+ res_nulls.as_ref(),
+ )
+}
diff --git a/datafusion/spark/src/function/map/mod.rs
b/datafusion/spark/src/function/map/mod.rs
index 21d1e0f108..2f596b19b4 100644
--- a/datafusion/spark/src/function/map/mod.rs
+++ b/datafusion/spark/src/function/map/mod.rs
@@ -16,6 +16,7 @@
// under the License.
pub mod map_from_arrays;
+pub mod map_from_entries;
mod utils;
use datafusion_expr::ScalarUDF;
@@ -23,6 +24,7 @@ use datafusion_functions::make_udf_function;
use std::sync::Arc;
make_udf_function!(map_from_arrays::MapFromArrays, map_from_arrays);
+make_udf_function!(map_from_entries::MapFromEntries, map_from_entries);
pub mod expr_fn {
use datafusion_functions::export_functions;
@@ -32,8 +34,14 @@ pub mod expr_fn {
"Creates a map from arrays of keys and values.",
keys values
));
+
+ export_functions!((
+ map_from_entries,
+ "Creates a map from array<struct<key, value>>.",
+ arg1
+ ));
}
pub fn functions() -> Vec<Arc<ScalarUDF>> {
- vec![map_from_arrays()]
+ vec![map_from_arrays(), map_from_entries()]
}
diff --git a/datafusion/spark/src/function/map/utils.rs
b/datafusion/spark/src/function/map/utils.rs
index fa4fc5fae4..b568f45403 100644
--- a/datafusion/spark/src/function/map/utils.rs
+++ b/datafusion/spark/src/function/map/utils.rs
@@ -157,8 +157,15 @@ fn map_deduplicate_keys(
let offsets_len = keys_offsets.len();
let mut new_offsets = Vec::with_capacity(offsets_len);
- let mut cur_keys_offset = 0;
- let mut cur_values_offset = 0;
+ let mut cur_keys_offset = keys_offsets
+ .first()
+ .map(|offset| *offset as usize)
+ .unwrap_or(0);
+ let mut cur_values_offset = values_offsets
+ .first()
+ .map(|offset| *offset as usize)
+ .unwrap_or(0);
+
let mut new_last_offset = 0;
new_offsets.push(new_last_offset);
@@ -176,36 +183,38 @@ fn map_deduplicate_keys(
let mut keys_mask_one = [false].repeat(num_keys_entries);
let mut values_mask_one = [false].repeat(num_values_entries);
- if num_keys_entries != num_values_entries {
- let key_is_valid = keys_nulls.is_none_or(|buf|
buf.is_valid(row_idx));
- let value_is_valid = values_nulls.is_none_or(|buf|
buf.is_valid(row_idx));
- if key_is_valid && value_is_valid {
+ let key_is_valid = keys_nulls.is_none_or(|buf| buf.is_valid(row_idx));
+ let value_is_valid = values_nulls.is_none_or(|buf|
buf.is_valid(row_idx));
+
+ if key_is_valid && value_is_valid {
+ if num_keys_entries != num_values_entries {
return exec_err!("map_deduplicate_keys: keys and values lists
in the same row must have equal lengths");
+ } else if num_keys_entries != 0 {
+ let mut seen_keys = HashSet::new();
+
+ for cur_entry_idx in (0..num_keys_entries).rev() {
+ let key = ScalarValue::try_from_array(
+ &flat_keys,
+ cur_keys_offset + cur_entry_idx,
+ )?
+ .compacted();
+ if seen_keys.contains(&key) {
+ // TODO: implement configuration and logic for
spark.sql.mapKeyDedupPolicy=EXCEPTION (this is default spark-config)
+ // exec_err!("invalid argument: duplicate keys in map")
+ //
https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961
+ } else {
+ // This code implements deduplication logic for
spark.sql.mapKeyDedupPolicy=LAST_WIN (this is NOT default spark-config)
+ keys_mask_one[cur_entry_idx] = true;
+ values_mask_one[cur_entry_idx] = true;
+ seen_keys.insert(key);
+ new_last_offset += 1;
+ }
+ }
}
- // else the result entry is NULL
+ } else {
+ // the result entry is NULL
// both current row offsets are skipped
// keys or values in the current row are marked false in the masks
- } else if num_keys_entries != 0 {
- let mut seen_keys = HashSet::new();
-
- for cur_entry_idx in (0..num_keys_entries).rev() {
- let key = ScalarValue::try_from_array(
- &flat_keys,
- cur_keys_offset + cur_entry_idx,
- )?
- .compacted();
- if seen_keys.contains(&key) {
- // TODO: implement configuration and logic for
spark.sql.mapKeyDedupPolicy=EXCEPTION (this is default spark-config)
- // exec_err!("invalid argument: duplicate keys in map")
- //
https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961
- } else {
- // This code implements deduplication logic for
spark.sql.mapKeyDedupPolicy=LAST_WIN (this is NOT default spark-config)
- keys_mask_one[cur_entry_idx] = true;
- values_mask_one[cur_entry_idx] = true;
- seen_keys.insert(key);
- new_last_offset += 1;
- }
- }
}
keys_mask_builder.append_array(&keys_mask_one.into());
values_mask_builder.append_array(&values_mask_one.into());
diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt
b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt
new file mode 100644
index 0000000000..19b46886a0
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt
@@ -0,0 +1,164 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Spark doctests
+query ?
+SELECT map_from_entries(array[struct(1, 'a'), struct(2, 'b')]);
+----
+{1: a, 2: b}
+
+query ?
+SELECT map_from_entries(array[struct(1, cast(null as string)), struct(2,
'b')]);
+----
+{1: NULL, 2: b}
+
+query ?
+SELECT map_from_entries(data)
+from values
+ (array[struct(1, 'a'), struct(2, 'b')]),
+ (array[struct(3, 'c')])
+as tab(data);
+----
+{1: a, 2: b}
+{3: c}
+
+# Tests with NULL and empty input structarrays
+query ?
+SELECT map_from_entries(data)
+from values
+ (cast(array[] as array<struct<int, string>>)),
+ (cast(NULL as array<struct<int, string>>))
+as tab(data);
+----
+{}
+NULL
+
+# Test with NULL key, should fail
+query error DataFusion error: Arrow error: Invalid argument error: Found
unmasked nulls for non-nullable StructArray field "key"
+SELECT map_from_entries(array[struct(NULL, 1)]);
+
+# Tests with NULL and array of Null type, should fail
+query error DataFusion error: Execution error: map_from_entries: expected
array<struct<key, value>>, got Null
+SELECT map_from_entries(NULL);
+
+query error DataFusion error: Execution error: map_from_entries: expected
array<struct<key, value>>, got Null
+SELECT map_from_entries(array[NULL]);
+
+# Test with NULL array and NULL entries in arrays
+# output is NULL if any entry is NULL
+query ?
+SELECT map_from_entries(data)
+from values
+ (
+ array[
+ struct(1 as a, 'a' as b),
+ cast(NULL as struct<a int, b string>),
+ cast(NULL as struct<a int, b string>)
+ ]
+ ),
+ (NULL),
+ (
+ array[
+ struct(2 as a, 'b' as b),
+ struct(3 as a, 'c' as b)
+ ]
+ ),
+ (
+ array[
+ struct(4 as a, 'd' as b),
+ cast(NULL as struct<a int, b string>),
+ struct(5 as a, 'e' as b),
+ struct(6 as a, 'f' as b)
+ ]
+ )
+as tab(data);
+----
+NULL
+NULL
+{2: b, 3: c}
+NULL
+
+#Test with multiple rows: good, empty and nullable
+query ?
+SELECT map_from_entries(data)
+from values
+ (NULL),
+ (array[
+ struct(1 as a, 'b' as b),
+ struct(2 as a, cast(NULL as string) as b),
+ struct(3 as a, 'd' as b)
+ ]),
+ (array[]),
+ (NULL)
+as tab(data);
+----
+NULL
+{1: b, 2: NULL, 3: d}
+{}
+NULL
+
+# Test with complex types
+query ?
+SELECT map_from_entries(array[
+ struct(array('a', 'b'), struct(1, 2, 3)),
+ struct(array('c', 'd'), struct(4, 5, 6))
+]);
+----
+{[a, b]: {c0: 1, c1: 2, c2: 3}, [c, d]: {c0: 4, c1: 5, c2: 6}}
+
+# Test with nested function calls
+query ?
+SELECT
+ map_from_entries(
+ array[
+ struct(
+ 'outer_key1',
+ -- value for outer_key1: a map itself
+ map_from_entries(
+ array[
+ struct('inner_a', 1),
+ struct('inner_b', 2)
+ ]
+ )
+ ),
+ struct(
+ 'outer_key2',
+ -- value for outer_key2: another map
+ map_from_entries(
+ array[
+ struct('inner_x', 10),
+ struct('inner_y', 20),
+ struct('inner_z', 30)
+ ]
+ )
+ )
+ ]
+ ) AS nested_map;
+----
+{outer_key1: {inner_a: 1, inner_b: 2}, outer_key2: {inner_x: 10, inner_y: 20,
inner_z: 30}}
+
+# Test with duplicate keys
+query ?
+SELECT map_from_entries(array(
+ struct(true, 'a'),
+ struct(false, 'b'),
+ struct(true, 'c'),
+ struct(false, cast(NULL as string)),
+ struct(true, 'd')
+));
+----
+{false: NULL, true: d}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]