Copilot commented on code in PR #20358: URL: https://github.com/apache/datafusion/pull/20358#discussion_r2807664054
########## datafusion/spark/src/function/map/map_expr.rs: ########## @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; + +use crate::function::map::utils::{ + map_from_keys_values_offsets_nulls, map_type_from_key_value_types, +}; +use arrow::array::{Array, ArrayRef, NullArray, new_null_array}; +use arrow::buffer::OffsetBuffer; +use arrow::compute::concat; +use arrow::compute::kernels::cast; +use arrow::datatypes::DataType; +use datafusion_common::{Result, exec_err}; +use datafusion_expr::type_coercion::binary::comparison_coercion; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use datafusion_functions::utils::make_scalar_function; + +/// Spark-compatible `map` expression +/// <https://spark.apache.org/docs/latest/api/sql/index.html#map> +/// +/// Creates a map from alternating key-value pairs. +/// Example: map(key1, value1, key2, value2, ...) -> {key1: value1, key2: value2, ...} +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Map { + signature: Signature, +} + +impl Default for Map { + fn default() -> Self { + Self::new() + } +} + +impl Map { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for Map { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "map" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { + if arg_types.is_empty() { + return exec_err!("map requires at least one pair of arguments, got 0"); + } + if !arg_types.len().is_multiple_of(2) { + return exec_err!( + "map requires an even number of arguments, got {}", + arg_types.len() + ); + } + + let key_type = arg_types + .iter() + .step_by(2) + .cloned() + .reduce(|common, t| comparison_coercion(&common, &t).unwrap_or(common)) + .unwrap(); + let value_type = arg_types + .iter() + .skip(1) + .step_by(2) + .cloned() + .reduce(|common, t| comparison_coercion(&common, &t).unwrap_or(common)) + .unwrap(); + + Ok(map_type_from_key_value_types(&key_type, &value_type)) + } + + fn invoke_with_args( + &self, + args: datafusion_expr::ScalarFunctionArgs, + ) -> Result<ColumnarValue> { + make_scalar_function(map_inner, vec![])(&args.args) + } +} + +fn map_inner(args: &[ArrayRef]) -> Result<ArrayRef> { + let num_rows = args[0].len(); + let num_pairs = args.len() / 2; + + let all_null = args + .iter() + .all(|arg| matches!(arg.data_type(), DataType::Null)); + + if all_null { + return Ok(cast( + &NullArray::new(num_rows), + &map_type_from_key_value_types(&DataType::Null, &DataType::Null), + )?); + } + + // Collect key arrays and value arrays from alternating arguments + let key_arrays: Vec<&dyn Array> = + (0..num_pairs).map(|i| args[i * 2].as_ref()).collect(); + let value_arrays: Vec<&dyn Array> = + (0..num_pairs).map(|i| args[i * 2 + 1].as_ref()).collect(); + + // Concatenate all keys and all values into flat arrays + let flat_keys: ArrayRef = if key_arrays.is_empty() { + new_null_array(args[0].data_type(), 0) + } else { + concat(&key_arrays)? + }; + let flat_values: ArrayRef = if value_arrays.is_empty() { + new_null_array(args[1].data_type(), 0) + } else { + concat(&value_arrays)? + }; + + // flat_keys layout: [row0_key0, row1_key0, ..., rowN_key0, row0_key1, row1_key1, ..., rowN_key1, ...] + // But we need: [row0_key0, row0_key1, ..., row0_keyM, row1_key0, row1_key1, ..., row1_keyM, ...] + // Rearrange for each row, gather keys from each pair. + // Source index for (row, pair) is (pair * num_rows + row) + let total_entries = num_rows * num_pairs; + let mut reorder_indices = Vec::with_capacity(total_entries); + for row in 0..num_rows { + for pair in 0..num_pairs { + reorder_indices.push((pair * num_rows + row) as u32); + } + } + + let indices = arrow::array::UInt32Array::from(reorder_indices); + let flat_keys = arrow::compute::take(&flat_keys, &indices, None)?; + let flat_values = arrow::compute::take(&flat_values, &indices, None)?; + + let offsets: Vec<i32> = (0..=num_rows as i32) + .map(|i| i * num_pairs as i32) + .collect(); + let offsets_buffer = OffsetBuffer::new(offsets.into()); + + map_from_keys_values_offsets_nulls( + &flat_keys, + &flat_values, + offsets_buffer.as_ref(), + offsets_buffer.as_ref(), + None, + None, + ) +} Review Comment: This implementation has a fundamental architectural mismatch with how DataFusion processes the map function: 1. **Wrong argument pattern**: The implementation expects alternating key-value arguments (k1, v1, k2, v2, ...) and processes them with complex reordering logic (lines 122-153). However, DataFusion's NestedFunctionPlanner intercepts map(k1, v1, k2, v2) calls and converts them to map([k1, k2], [v1, v2]) - two array arguments - before this function is invoked. 2. **Should follow MapFunc pattern**: This should match the built-in MapFunc pattern (datafusion/functions-nested/src/map.rs lines 258-301): use variadic_any signature but expect exactly 2 array arguments at runtime via take_function_args, then extract list values and offsets. 3. **Missing duplicate key validation**: The test at line 55-56 of map.slt expects "map key must be unique, duplicate key found: 1" but map_from_keys_values_offsets_nulls uses LAST_WIN behavior (silently drops duplicates). Validation similar to MapFunc::validate_map_keys (functions-nested/src/map.rs:95-114) is needed. 4. **Incorrect return_type implementation**: Lines 70-96 process alternating arguments but should process two array arguments like MapFunc::return_type (functions-nested/src/map.rs:271-289). The entire implementation needs to be restructured to accept two array arguments (keys and values) instead of variadic alternating arguments. ########## datafusion/spark/src/function/map/map_expr.rs: ########## @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; + +use crate::function::map::utils::{ + map_from_keys_values_offsets_nulls, map_type_from_key_value_types, +}; +use arrow::array::{Array, ArrayRef, NullArray, new_null_array}; +use arrow::buffer::OffsetBuffer; +use arrow::compute::concat; +use arrow::compute::kernels::cast; +use arrow::datatypes::DataType; +use datafusion_common::{Result, exec_err}; +use datafusion_expr::type_coercion::binary::comparison_coercion; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use datafusion_functions::utils::make_scalar_function; + +/// Spark-compatible `map` expression +/// <https://spark.apache.org/docs/latest/api/sql/index.html#map> +/// +/// Creates a map from alternating key-value pairs. +/// Example: map(key1, value1, key2, value2, ...) -> {key1: value1, key2: value2, ...} +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Map { + signature: Signature, +} + +impl Default for Map { + fn default() -> Self { + Self::new() + } +} + +impl Map { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for Map { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "map" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { + if arg_types.is_empty() { + return exec_err!("map requires at least one pair of arguments, got 0"); Review Comment: The error message here says "map requires at least one pair of arguments" but the test at line 59 expects "Function 'map' expected at least one argument but received 0". This discrepancy suggests the validation might occur elsewhere (likely in the built-in planner's handling before this function is reached). However, if this validation path is reached, the error message should be consistent with Spark's expected behavior or the test expectations should be updated. ```suggestion return exec_err!("Function 'map' expected at least one argument but received 0"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
