Copilot commented on code in PR #2141:
URL: https://github.com/apache/auron/pull/2141#discussion_r3020651212


##########
native-engine/datafusion-ext-functions/src/spark_map.rs:
##########
@@ -0,0 +1,504 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{collections::HashSet, sync::Arc};
+
+use arrow::{
+    array::{Array, ArrayRef, MapArray, StructArray, new_empty_array},
+    buffer::{NullBuffer, OffsetBuffer, ScalarBuffer},
+    datatypes::{DataType, Field},
+};
+use datafusion::{
+    common::{Result, ScalarValue},
+    logical_expr::ColumnarValue,
+};
+use datafusion_ext_commons::{df_execution_err, 
scalar_value::compacted_scalar_value_from_array};
+
+fn get_map_type(args: &[ColumnarValue]) -> Result<(Arc<Field>, bool)> {
+    let (entries_field, ordered) = args
+        .iter()
+        .find_map(|arg| match arg.data_type() {
+            DataType::Map(entries_field, ordered) => Some((entries_field, 
ordered)),
+            DataType::Null => None,
+            _ => None,
+        })
+        .ok_or_else(|| {
+            datafusion::error::DataFusionError::Execution(
+                "map_concat requires at least one map argument".to_string(),
+            )
+        })?;
+
+    validate_map_arg_types(args, &entries_field, ordered)?;
+    Ok((entries_field, ordered))
+}
+
+fn validate_map_arg_types(
+    args: &[ColumnarValue],
+    expected_entries_field: &Arc<Field>,
+    expected_ordered: bool,
+) -> Result<()> {
+    for arg in args {
+        match arg.data_type() {
+            DataType::Map(entries_field, ordered) => {
+                if entries_field != *expected_entries_field || ordered != 
expected_ordered {
+                    return df_execution_err!(
+                        "map_concat requires all map args to have the same 
type, expected {:?}, found {:?}",
+                        DataType::Map(expected_entries_field.clone(), 
expected_ordered),
+                        DataType::Map(entries_field, ordered)
+                    );
+                }
+            }
+            DataType::Null => {}
+            data_type => {
+                return df_execution_err!("map_concat args must be map, found 
{data_type:?}");
+            }
+        }
+    }
+    Ok(())
+}
+
+fn extract_map_entry_fields(entries_field: &Arc<Field>) -> Result<(Arc<Field>, 
Arc<Field>)> {
+    let fields = match entries_field.data_type() {
+        DataType::Struct(fields) => fields,
+        _ => return df_execution_err!("map_concat map entries field must be 
struct"),
+    };
+
+    if fields.len() != 2 {
+        return df_execution_err!(
+            "map_concat map entries struct must contain exactly 2 fields, 
found {}",
+            fields.len()
+        );
+    }
+
+    Ok((fields[0].clone(), fields[1].clone()))
+}
+
+fn new_null_map_array(entries_field: Arc<Field>, ordered: bool, len: usize) -> 
Result<MapArray> {
+    let (key_field, value_field) = extract_map_entry_fields(&entries_field)?;
+
+    let entries = StructArray::from(vec![
+        (
+            key_field.clone(),
+            new_empty_array(key_field.data_type()) as ArrayRef,
+        ),
+        (
+            value_field.clone(),
+            new_empty_array(value_field.data_type()) as ArrayRef,
+        ),
+    ]);
+
+    Ok(MapArray::new(
+        entries_field,
+        OffsetBuffer::new(ScalarBuffer::from(vec![0i32; len + 1])),
+        entries,
+        Some(NullBuffer::from(vec![false; len])),
+        ordered,
+    ))
+}
+
+fn as_map_array(array: &ArrayRef) -> Result<MapArray> {
+    array
+        .as_any()
+        .downcast_ref::<MapArray>()
+        .cloned()
+        .ok_or_else(|| {
+            datafusion::error::DataFusionError::Execution(format!(
+                "map_concat args must be map, found {:?}",
+                array.data_type()
+            ))
+        })
+}
+
+fn columnar_value_to_map_array(
+    arg: &ColumnarValue,
+    entries_field: &Arc<Field>,
+    ordered: bool,
+) -> Result<MapArray> {
+    match arg {
+        ColumnarValue::Array(array) if matches!(array.data_type(), 
DataType::Null) => {
+            new_null_map_array(entries_field.clone(), ordered, array.len())
+        }
+        ColumnarValue::Array(array) => as_map_array(array),
+        ColumnarValue::Scalar(scalar) if scalar.is_null() => {
+            new_null_map_array(entries_field.clone(), ordered, 1)
+        }
+        ColumnarValue::Scalar(scalar) => {
+            let array = scalar.to_array()?;
+            as_map_array(&array)
+        }
+    }
+}
+
+fn get_arg_arrays(
+    args: &[ColumnarValue],
+    entries_field: &Arc<Field>,
+    ordered: bool,
+) -> Result<Vec<MapArray>> {
+    args.iter()
+        .map(|arg| columnar_value_to_map_array(arg, entries_field, ordered))
+        .collect()
+}
+
+/// Returns the union of all given maps.
+///
+/// This follows Spark's default duplicate-key behavior by raising an error,
+/// and propagates null when any input map for a row is null.
+pub fn map_concat(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    let (entries_field, ordered) = get_map_type(args)?;
+    let arg_arrays = get_arg_arrays(args, &entries_field, ordered)?;
+
+    let num_rows = arg_arrays
+        .iter()
+        .map(|array| array.len())
+        .filter(|&len| len != 1)
+        .max()
+        .unwrap_or(1);
+
+    if arg_arrays
+        .iter()
+        .any(|array| array.len() != 1 && array.len() != num_rows)
+    {
+        return df_execution_err!("all maps of map_concat must have the same 
length");
+    }
+
+    let (key_field, value_field) = extract_map_entry_fields(&entries_field)?;
+
+    let mut all_keys = Vec::new();
+    let mut all_values = Vec::new();
+    let mut offsets = Vec::with_capacity(num_rows + 1);
+    let mut valids = Vec::with_capacity(num_rows);
+    let mut next_offset = 0i32;
+
+    offsets.push(next_offset);
+
+    for row_idx in 0..num_rows {
+        let mut row_keys = HashSet::new();
+        let mut row_entries: Vec<(ScalarValue, ScalarValue)> = Vec::new();
+        let mut row_is_null = false;
+
+        for array in &arg_arrays {
+            let idx = if array.len() == 1 { 0 } else { row_idx };
+
+            if array.is_null(idx) {
+                row_is_null = true;
+                break;
+            }
+
+            let entries = array.value(idx);
+            let entries = entries
+                .as_any()
+                .downcast_ref::<StructArray>()
+                .ok_or_else(|| {
+                    datafusion::error::DataFusionError::Execution(
+                        "map_concat expects map entries to be 
struct".to_string(),
+                    )
+                })?;
+
+            let keys = entries.column(0);
+            let values = entries.column(1);
+
+            for i in 0..entries.len() {
+                let key = compacted_scalar_value_from_array(keys.as_ref(), i)?;
+                if !row_keys.insert(key.clone()) {
+                    return df_execution_err!("map_concat duplicate key found: 
{key}");
+                }
+
+                let value = compacted_scalar_value_from_array(values.as_ref(), 
i)?;
+                row_entries.push((key, value));

Review Comment:
   map_concat currently allows null keys: 
`compacted_scalar_value_from_array(keys, i)?` may produce a null ScalarValue, 
which then gets inserted into the output map. Spark MapType disallows null keys 
and other map code in this repo already treats null keys as unsupported; 
consider explicitly checking `keys.is_null(i)` and returning an execution error 
(before duplicate-key handling) to avoid producing invalid MapArrays or 
mismatching Spark behavior.



##########
native-engine/datafusion-ext-functions/src/spark_map.rs:
##########
@@ -0,0 +1,504 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{collections::HashSet, sync::Arc};
+
+use arrow::{
+    array::{Array, ArrayRef, MapArray, StructArray, new_empty_array},
+    buffer::{NullBuffer, OffsetBuffer, ScalarBuffer},
+    datatypes::{DataType, Field},
+};
+use datafusion::{
+    common::{Result, ScalarValue},
+    logical_expr::ColumnarValue,
+};
+use datafusion_ext_commons::{df_execution_err, 
scalar_value::compacted_scalar_value_from_array};
+
+fn get_map_type(args: &[ColumnarValue]) -> Result<(Arc<Field>, bool)> {
+    let (entries_field, ordered) = args
+        .iter()
+        .find_map(|arg| match arg.data_type() {
+            DataType::Map(entries_field, ordered) => Some((entries_field, 
ordered)),
+            DataType::Null => None,
+            _ => None,
+        })
+        .ok_or_else(|| {
+            datafusion::error::DataFusionError::Execution(
+                "map_concat requires at least one map argument".to_string(),
+            )
+        })?;

Review Comment:
   If `map_concat` is invoked with non-map arguments only (e.g., due to an 
unexpected planner/converter bug), `get_map_type` will currently return the 
generic "requires at least one map argument" error because it ignores non-map 
types when searching. Consider making the error depend on `args.is_empty()` vs. 
"no map-typed args" so invalid types produce a clearer "args must be map" 
execution error.
   ```suggestion
       if args.is_empty() {
           return df_execution_err!("map_concat requires at least one map 
argument");
       }
   
       let (entries_field, ordered) = match args
           .iter()
           .find_map(|arg| match arg.data_type() {
               DataType::Map(entries_field, ordered) => Some((entries_field, 
ordered)),
               DataType::Null => None,
               _ => None,
           }) {
           Some((entries_field, ordered)) => (entries_field, ordered),
           None => {
               return df_execution_err!("map_concat args must be map");
           }
       };
   ```



##########
native-engine/datafusion-ext-functions/src/spark_map.rs:
##########
@@ -0,0 +1,504 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{collections::HashSet, sync::Arc};
+
+use arrow::{
+    array::{Array, ArrayRef, MapArray, StructArray, new_empty_array},
+    buffer::{NullBuffer, OffsetBuffer, ScalarBuffer},
+    datatypes::{DataType, Field},
+};
+use datafusion::{
+    common::{Result, ScalarValue},
+    logical_expr::ColumnarValue,
+};
+use datafusion_ext_commons::{df_execution_err, 
scalar_value::compacted_scalar_value_from_array};
+
+fn get_map_type(args: &[ColumnarValue]) -> Result<(Arc<Field>, bool)> {
+    let (entries_field, ordered) = args
+        .iter()
+        .find_map(|arg| match arg.data_type() {
+            DataType::Map(entries_field, ordered) => Some((entries_field, 
ordered)),
+            DataType::Null => None,
+            _ => None,
+        })
+        .ok_or_else(|| {
+            datafusion::error::DataFusionError::Execution(
+                "map_concat requires at least one map argument".to_string(),
+            )
+        })?;
+
+    validate_map_arg_types(args, &entries_field, ordered)?;
+    Ok((entries_field, ordered))
+}
+
+fn validate_map_arg_types(
+    args: &[ColumnarValue],
+    expected_entries_field: &Arc<Field>,
+    expected_ordered: bool,
+) -> Result<()> {
+    for arg in args {
+        match arg.data_type() {
+            DataType::Map(entries_field, ordered) => {
+                if entries_field != *expected_entries_field || ordered != 
expected_ordered {
+                    return df_execution_err!(
+                        "map_concat requires all map args to have the same 
type, expected {:?}, found {:?}",
+                        DataType::Map(expected_entries_field.clone(), 
expected_ordered),
+                        DataType::Map(entries_field, ordered)
+                    );
+                }

Review Comment:
   Type checking is comparing the full `entries_field` (`Arc<Field>`) for 
equality. Because Arrow `Field` equality includes name/nullable/metadata, this 
can reject logically compatible maps that share the same key/value types but 
differ in field nullability/metadata (e.g., maps constructed by different code 
paths). Consider validating based on extracted key/value `DataType` (and 
`ordered`) instead of strict `Field` equality.
   ```suggestion
       // Determine the expected key and value types from the map entries 
struct.
       let (expected_key_field, expected_value_field) =
           extract_map_entry_fields(expected_entries_field)?;
       let expected_key_type = expected_key_field.data_type();
       let expected_value_type = expected_value_field.data_type();
   
       for arg in args {
           match arg.data_type() {
               DataType::Map(entries_field, ordered) => {
                   // First ensure the `ordered` property matches.
                   if *ordered != expected_ordered {
                       return df_execution_err!(
                           "map_concat requires all map args to have the same 
type, expected {:?}, found {:?}",
                           DataType::Map(expected_entries_field.clone(), 
expected_ordered),
                           DataType::Map(entries_field.clone(), *ordered)
                       );
                   }
   
                   // Compare key and value data types while ignoring field 
name, nullability, and metadata.
                   let (key_field, value_field) = 
extract_map_entry_fields(entries_field)?;
                   if key_field.data_type() != expected_key_type
                       || value_field.data_type() != expected_value_type
                   {
                       return df_execution_err!(
                           "map_concat requires all map args to have the same 
type, expected {:?}, found {:?}",
                           DataType::Map(expected_entries_field.clone(), 
expected_ordered),
                           DataType::Map(entries_field.clone(), *ordered)
                       );
                   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to