This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 7567b7aacf optimize(concat): concat map implementation (#10048)
7567b7aacf is described below
commit 7567b7aacf3c4f7ea21d78dda2054f5312ef80aa
Author: mwish <[email protected]>
AuthorDate: Fri Jun 12 05:12:28 2026 +0800
optimize(concat): concat map implementation (#10048)
# Which issue does this PR close?
- Closes #10047 .
# Rationale for this change
Implement concat for map
# What changes are included in this PR?
Implement concat for map
# Are these changes tested?
Yes
# Are there any user-facing changes?
No
---------
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
arrow-select/src/concat.rs | 183 ++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 182 insertions(+), 1 deletion(-)
diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs
index 19eeeacbfb..bb85d7035a 100644
--- a/arrow-select/src/concat.rs
+++ b/arrow-select/src/concat.rs
@@ -208,6 +208,72 @@ fn concat_lists<OffsetSize: OffsetSizeTrait>(
Ok(Arc::new(array))
}
+fn concat_maps(
+ arrays: &[&dyn Array],
+ field: &FieldRef,
+ ordered: bool,
+) -> Result<ArrayRef, ArrowError> {
+ let mut output_len = 0;
+ let mut map_has_nulls = false;
+ let mut map_has_slices = false;
+
+ let maps = arrays
+ .iter()
+ .map(|x| x.as_map())
+ .inspect(|m| {
+ output_len += m.len();
+ map_has_nulls |= m.null_count() != 0;
+ map_has_slices |=
+ m.offsets()[0] > 0 || m.offsets().last().unwrap().as_usize() <
m.entries().len();
+ })
+ .collect::<Vec<_>>();
+
+ let map_nulls = map_has_nulls.then(|| {
+ let mut nulls = BooleanBufferBuilder::new(output_len);
+ for m in &maps {
+ match m.nulls() {
+ Some(n) => nulls.append_buffer(n.inner()),
+ None => nulls.append_n(m.len(), true),
+ }
+ }
+ NullBuffer::new(nulls.finish())
+ });
+
+ // If any of the maps have slices, we need to slice the entries
+ // to ensure that the offsets are correct
+ let mut sliced_entries: Vec<ArrayRef>;
+ let entries: Vec<&dyn Array> = if map_has_slices {
+ sliced_entries = Vec::with_capacity(maps.len());
+ for m in &maps {
+ let offsets = m.offsets();
+ let start_offset = offsets[0].as_usize();
+ let end_offset = offsets.last().unwrap().as_usize();
+ let entries_arr: &dyn Array = m.entries();
+ sliced_entries.push(entries_arr.slice(start_offset, end_offset -
start_offset));
+ }
+ sliced_entries.iter().map(|a| a.as_ref()).collect()
+ } else {
+ maps.iter().map(|m| m.entries() as &dyn Array).collect()
+ };
+
+ let concatenated_entries = concat(entries.as_slice())?;
+
+ // Merge value offsets from the maps
+ let value_offset_buffer =
+ OffsetBuffer::<i32>::from_lengths(maps.iter().flat_map(|m|
m.offsets().lengths()));
+
+ let array = MapArray::try_new(
+ Arc::clone(field),
+ value_offset_buffer,
+ // Safety: Map entries are always StructArrays, so this downcast is
guaranteed to succeed
+ concatenated_entries.as_struct().clone(),
+ map_nulls,
+ ordered,
+ )?;
+
+ Ok(Arc::new(array))
+}
+
fn concat_list_view<OffsetSize: OffsetSizeTrait>(
arrays: &[&dyn Array],
field: &FieldRef,
@@ -482,6 +548,7 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef,
ArrowError> {
DataType::LargeList(field) => concat_lists::<i64>(arrays, field),
DataType::ListView(field) => concat_list_view::<i32>(arrays, field),
DataType::LargeListView(field) => concat_list_view::<i64>(arrays,
field),
+ DataType::Map(field, ordered) => concat_maps(arrays, field, *ordered),
DataType::Struct(fields) => concat_structs(arrays, fields),
DataType::Utf8 => concat_bytes::<Utf8Type>(arrays),
DataType::LargeUtf8 => concat_bytes::<LargeUtf8Type>(arrays),
@@ -561,7 +628,8 @@ pub fn concat_batches<'a>(
mod tests {
use super::*;
use arrow_array::builder::{
- GenericListBuilder, Int64Builder, ListViewBuilder,
StringDictionaryBuilder,
+ GenericListBuilder, Int32Builder as Int32ArrayBuilder, Int64Builder,
ListViewBuilder,
+ MapBuilder, StringBuilder, StringDictionaryBuilder,
};
use arrow_schema::{Field, Schema};
use std::fmt::Debug;
@@ -1900,4 +1968,117 @@ mod tests {
assert_eq!(values.values(), &[10, 20, 30]);
assert_eq!(&[2, 3, 5], run_ends);
}
+
+ /// A single row of a {String -> Int32} map: `None` for a null row,
otherwise
+ /// the list of (key, optional value) entries.
+ type StringIntMapRow<'a> = Option<Vec<(&'a str, Option<i32>)>>;
+
+ /// Helper to build a MapArray of {String -> Int32} from a list of entries
per row.
+ fn build_string_int_map(rows: Vec<StringIntMapRow>) -> MapArray {
+ let mut builder = MapBuilder::new(None, StringBuilder::new(),
Int32ArrayBuilder::new());
+ for row in rows {
+ match row {
+ Some(entries) => {
+ for (k, v) in entries {
+ builder.keys().append_value(k);
+ builder.values().append_option(v);
+ }
+ builder.append(true).unwrap();
+ }
+ None => {
+ builder.append(false).unwrap();
+ }
+ }
+ }
+ builder.finish()
+ }
+
+ #[test]
+ fn test_concat_map_arrays() {
+ let map1 = build_string_int_map(vec![
+ Some(vec![("a", Some(1)), ("b", Some(2))]),
+ Some(vec![("c", Some(3))]),
+ ]);
+ let map2 = build_string_int_map(vec![
+ Some(vec![("d", Some(4)), ("e", Some(5))]),
+ None,
+ Some(vec![("f", Some(6))]),
+ ]);
+
+ let result = concat(&[&map1, &map2]).unwrap();
+ let result_map = result.as_map();
+
+ assert_eq!(result_map.len(), 5);
+ assert_eq!(result_map.null_count(), 1);
+
+ // Check offsets
+ assert_eq!(result_map.value_offsets(), &[0, 2, 3, 5, 5, 6]);
+
+ // Check keys
+ let keys = result_map.keys().as_string::<i32>();
+ let expected_keys: Vec<&str> = vec!["a", "b", "c", "d", "e", "f"];
+ let actual_keys: Vec<&str> = keys.iter().map(|v| v.unwrap()).collect();
+ assert_eq!(actual_keys, expected_keys);
+
+ // Check values
+ let values = result_map.values().as_primitive::<Int32Type>();
+ assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6]);
+ }
+
+ #[test]
+ fn test_concat_map_arrays_sliced() {
+ let map = build_string_int_map(vec![
+ Some(vec![("a", Some(1))]),
+ Some(vec![("b", Some(2)), ("c", Some(3))]),
+ Some(vec![("d", Some(4))]),
+ Some(vec![("e", Some(5))]),
+ ]);
+
+ // Slice to get the middle two rows: [("b",2),("c",3)] and [("d",4)]
+ let sliced = map.slice(1, 2);
+
+ let map2 = build_string_int_map(vec![Some(vec![("f", Some(6))])]);
+
+ let result = concat(&[&sliced, &map2]).unwrap();
+ let result_map = result.as_map();
+
+ assert_eq!(result_map.len(), 3);
+ assert_eq!(result_map.value_offsets(), &[0, 2, 3, 4]);
+
+ let keys = result_map.keys().as_string::<i32>();
+ let actual_keys: Vec<&str> = keys.iter().map(|v| v.unwrap()).collect();
+ assert_eq!(actual_keys, vec!["b", "c", "d", "f"]);
+ }
+
+ #[test]
+ fn test_concat_map_arrays_with_nulls() {
+ let map1 = build_string_int_map(vec![Some(vec![("a", Some(1))]),
None]);
+ let map2 = build_string_int_map(vec![None, Some(vec![("b",
Some(2))])]);
+
+ let result = concat(&[&map1, &map2]).unwrap();
+ let result_map = result.as_map();
+
+ assert_eq!(result_map.len(), 4);
+ assert_eq!(result_map.null_count(), 2);
+ assert!(result_map.is_valid(0));
+ assert!(result_map.is_null(1));
+ assert!(result_map.is_null(2));
+ assert!(result_map.is_valid(3));
+ }
+
+ #[test]
+ fn test_concat_map_arrays_empty_maps() {
+ let map1 = build_string_int_map(vec![Some(vec![]), Some(vec![("a",
Some(1))])]);
+ let map2 = build_string_int_map(vec![
+ Some(vec![]),
+ Some(vec![("b", Some(2)), ("c", Some(3))]),
+ ]);
+
+ let result = concat(&[&map1, &map2]).unwrap();
+ let result_map = result.as_map();
+
+ assert_eq!(result_map.len(), 4);
+ assert_eq!(result_map.null_count(), 0);
+ assert_eq!(result_map.value_offsets(), &[0, 0, 1, 1, 3]);
+ }
}