This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d692df0358 feat: Optimize hash util for `MapArray` (#20179)
d692df0358 is described below
commit d692df03585f77bd8112d41fa80e2ae239785e9c
Author: Jonathan Chen <[email protected]>
AuthorDate: Wed Feb 18 08:42:02 2026 -0500
feat: Optimize hash util for `MapArray` (#20179)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #20151 .
## Rationale for this change
Reduce the irrelevant data being used to hash for `MapArray`
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
datafusion/common/benches/with_hashes.rs | 252 ++++++++++++++++++++++++++++++-
datafusion/common/src/hash_utils.rs | 126 +++++++++++++++-
2 files changed, 363 insertions(+), 15 deletions(-)
diff --git a/datafusion/common/benches/with_hashes.rs
b/datafusion/common/benches/with_hashes.rs
index 64ce8e8dfe..9ee31d9c4b 100644
--- a/datafusion/common/benches/with_hashes.rs
+++ b/datafusion/common/benches/with_hashes.rs
@@ -19,13 +19,13 @@
use ahash::RandomState;
use arrow::array::{
- Array, ArrayRef, ArrowPrimitiveType, DictionaryArray, GenericStringArray,
- NullBufferBuilder, OffsetSizeTrait, PrimitiveArray, RunArray,
StringViewArray,
- StructArray, make_array,
+ Array, ArrayRef, ArrowPrimitiveType, DictionaryArray, GenericStringArray,
Int32Array,
+ Int64Array, ListArray, MapArray, NullBufferBuilder, OffsetSizeTrait,
PrimitiveArray,
+ RunArray, StringViewArray, StructArray, UnionArray, make_array,
};
-use arrow::buffer::NullBuffer;
+use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{
- ArrowDictionaryKeyType, DataType, Field, Fields, Int32Type, Int64Type,
+ ArrowDictionaryKeyType, DataType, Field, Fields, Int32Type, Int64Type,
UnionFields,
};
use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use datafusion_common::hash_utils::with_hashes;
@@ -40,6 +40,7 @@ const BATCH_SIZE: usize = 8192;
struct BenchData {
name: &'static str,
array: ArrayRef,
+ /// Union arrays can't have null bitmasks added
supports_nulls: bool,
}
@@ -78,6 +79,26 @@ fn criterion_benchmark(c: &mut Criterion) {
array: pool.dictionary_array::<Int32Type>(BATCH_SIZE),
supports_nulls: true,
},
+ BenchData {
+ name: "list_array",
+ array: list_array(BATCH_SIZE),
+ supports_nulls: true,
+ },
+ BenchData {
+ name: "map_array",
+ array: map_array(BATCH_SIZE),
+ supports_nulls: true,
+ },
+ BenchData {
+ name: "sparse_union",
+ array: sparse_union_array(BATCH_SIZE),
+ supports_nulls: false,
+ },
+ BenchData {
+ name: "dense_union",
+ array: dense_union_array(BATCH_SIZE),
+ supports_nulls: false,
+ },
BenchData {
name: "struct_array",
array: create_struct_array(&pool, BATCH_SIZE),
@@ -103,10 +124,9 @@ fn criterion_benchmark(c: &mut Criterion) {
let arrays = vec![array.clone(), array.clone(), array.clone()];
do_hash_test(b, &arrays);
});
-
+ // Union arrays can't have null bitmasks
if supports_nulls {
let nullable_array = add_nulls(&array);
-
c.bench_function(&format!("{name}: single, nulls"), |b| {
do_hash_test(b, std::slice::from_ref(&nullable_array));
});
@@ -268,6 +288,222 @@ where
Arc::new(array)
}
+/// Benchmark sliced arrays to demonstrate the optimization for when an array
is
+/// sliced, the underlying buffer may be much larger than what's referenced by
+/// the slice. The optimization avoids hashing unreferenced elements.
+fn sliced_array_benchmark(c: &mut Criterion) {
+ // Test with different slice ratios: slice_size / total_size
+ // Smaller ratio = more potential savings from the optimization
+ let slice_ratios = [10, 5, 2]; // 1/10, 1/5, 1/2 of total
+
+ for ratio in slice_ratios {
+ let total_rows = BATCH_SIZE * ratio;
+ let slice_offset = BATCH_SIZE * (ratio / 2); // Take from middle
+ let slice_len = BATCH_SIZE;
+
+ // Sliced ListArray
+ {
+ let full_array = list_array(total_rows);
+ let sliced: ArrayRef = Arc::new(
+ full_array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap()
+ .slice(slice_offset, slice_len),
+ );
+ c.bench_function(
+ &format!("list_array_sliced: 1/{ratio} of {total_rows} rows"),
+ |b| {
+ do_hash_test_with_len(b, std::slice::from_ref(&sliced),
slice_len);
+ },
+ );
+ }
+
+ // Sliced MapArray
+ {
+ let full_array = map_array(total_rows);
+ let sliced: ArrayRef = Arc::new(
+ full_array
+ .as_any()
+ .downcast_ref::<MapArray>()
+ .unwrap()
+ .slice(slice_offset, slice_len),
+ );
+ c.bench_function(
+ &format!("map_array_sliced: 1/{ratio} of {total_rows} rows"),
+ |b| {
+ do_hash_test_with_len(b, std::slice::from_ref(&sliced),
slice_len);
+ },
+ );
+ }
+
+ // Sliced Sparse UnionArray
+ {
+ let full_array = sparse_union_array(total_rows);
+ let sliced: ArrayRef = Arc::new(
+ full_array
+ .as_any()
+ .downcast_ref::<UnionArray>()
+ .unwrap()
+ .slice(slice_offset, slice_len),
+ );
+ c.bench_function(
+ &format!("sparse_union_sliced: 1/{ratio} of {total_rows}
rows"),
+ |b| {
+ do_hash_test_with_len(b, std::slice::from_ref(&sliced),
slice_len);
+ },
+ );
+ }
+ }
+}
+
+fn do_hash_test_with_len(b: &mut Bencher, arrays: &[ArrayRef], expected_len:
usize) {
+ let state = RandomState::new();
+ b.iter(|| {
+ with_hashes(arrays, &state, |hashes| {
+ assert_eq!(hashes.len(), expected_len);
+ Ok(())
+ })
+ .unwrap();
+ });
+}
+
+fn list_array(num_rows: usize) -> ArrayRef {
+ let mut rng = make_rng();
+ let elements_per_row = 5;
+ let total_elements = num_rows * elements_per_row;
+
+ let values: Int64Array = (0..total_elements)
+ .map(|_| Some(rng.random::<i64>()))
+ .collect();
+ let offsets: Vec<i32> = (0..=num_rows)
+ .map(|i| (i * elements_per_row) as i32)
+ .collect();
+
+ Arc::new(ListArray::new(
+ Arc::new(Field::new("item", DataType::Int64, true)),
+ OffsetBuffer::new(ScalarBuffer::from(offsets)),
+ Arc::new(values),
+ None,
+ ))
+}
+
+fn map_array(num_rows: usize) -> ArrayRef {
+ let mut rng = make_rng();
+ let entries_per_row = 5;
+ let total_entries = num_rows * entries_per_row;
+
+ let keys: Int32Array = (0..total_entries)
+ .map(|_| Some(rng.random::<i32>()))
+ .collect();
+ let values: Int64Array = (0..total_entries)
+ .map(|_| Some(rng.random::<i64>()))
+ .collect();
+ let offsets: Vec<i32> = (0..=num_rows)
+ .map(|i| (i * entries_per_row) as i32)
+ .collect();
+
+ let entries = StructArray::try_new(
+ Fields::from(vec![
+ Field::new("keys", DataType::Int32, false),
+ Field::new("values", DataType::Int64, true),
+ ]),
+ vec![Arc::new(keys), Arc::new(values)],
+ None,
+ )
+ .unwrap();
+
+ Arc::new(MapArray::new(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(Fields::from(vec![
+ Field::new("keys", DataType::Int32, false),
+ Field::new("values", DataType::Int64, true),
+ ])),
+ false,
+ )),
+ OffsetBuffer::new(ScalarBuffer::from(offsets)),
+ entries,
+ None,
+ false,
+ ))
+}
+
+fn sparse_union_array(num_rows: usize) -> ArrayRef {
+ let mut rng = make_rng();
+ let num_types = 5;
+
+ let type_ids: Vec<i8> = (0..num_rows)
+ .map(|_| rng.random_range(0..num_types) as i8)
+ .collect();
+ let (fields, children): (Vec<_>, Vec<_>) = (0..num_types)
+ .map(|i| {
+ (
+ (
+ i as i8,
+ Arc::new(Field::new(format!("f{i}"), DataType::Int64,
true)),
+ ),
+ primitive_array::<Int64Type>(num_rows),
+ )
+ })
+ .unzip();
+
+ Arc::new(
+ UnionArray::try_new(
+ UnionFields::from_iter(fields),
+ ScalarBuffer::from(type_ids),
+ None,
+ children,
+ )
+ .unwrap(),
+ )
+}
+
+fn dense_union_array(num_rows: usize) -> ArrayRef {
+ let mut rng = make_rng();
+ let num_types = 5;
+ let type_ids: Vec<i8> = (0..num_rows)
+ .map(|_| rng.random_range(0..num_types) as i8)
+ .collect();
+
+ let mut type_counts = vec![0i32; num_types];
+ for &tid in &type_ids {
+ type_counts[tid as usize] += 1;
+ }
+
+ let mut current_offsets = vec![0i32; num_types];
+ let offsets: Vec<i32> = type_ids
+ .iter()
+ .map(|&tid| {
+ let offset = current_offsets[tid as usize];
+ current_offsets[tid as usize] += 1;
+ offset
+ })
+ .collect();
+
+ let (fields, children): (Vec<_>, Vec<_>) = (0..num_types)
+ .map(|i| {
+ (
+ (
+ i as i8,
+ Arc::new(Field::new(format!("f{i}"), DataType::Int64,
true)),
+ ),
+ primitive_array::<Int64Type>(type_counts[i] as usize),
+ )
+ })
+ .unzip();
+
+ Arc::new(
+ UnionArray::try_new(
+ UnionFields::from_iter(fields),
+ ScalarBuffer::from(type_ids),
+ Some(ScalarBuffer::from(offsets)),
+ children,
+ )
+ .unwrap(),
+ )
+}
+
fn boolean_array(array_len: usize) -> ArrayRef {
let mut rng = make_rng();
Arc::new(
@@ -329,5 +565,5 @@ where
)
}
-criterion_group!(benches, criterion_benchmark);
+criterion_group!(benches, criterion_benchmark, sliced_array_benchmark);
criterion_main!(benches);
diff --git a/datafusion/common/src/hash_utils.rs
b/datafusion/common/src/hash_utils.rs
index 1489f688c3..3be6118c55 100644
--- a/datafusion/common/src/hash_utils.rs
+++ b/datafusion/common/src/hash_utils.rs
@@ -20,10 +20,12 @@
use ahash::RandomState;
use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
use arrow::array::*;
+use arrow::compute::take;
use arrow::datatypes::*;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use itertools::Itertools;
+use std::collections::HashMap;
#[cfg(not(feature = "force_hash_collisions"))]
use crate::cast::{
@@ -541,15 +543,29 @@ fn hash_map_array(
let offsets = array.offsets();
// Create hashes for each entry in each row
- let mut values_hashes = vec![0u64; array.entries().len()];
- create_hashes(array.entries().columns(), random_state, &mut
values_hashes)?;
+ let first_offset = offsets.first().copied().unwrap_or_default() as usize;
+ let last_offset = offsets.last().copied().unwrap_or_default() as usize;
+ let entries_len = last_offset - first_offset;
+
+ // Only hash the entries that are actually referenced
+ let mut values_hashes = vec![0u64; entries_len];
+ let entries = array.entries();
+ let sliced_columns: Vec<ArrayRef> = entries
+ .columns()
+ .iter()
+ .map(|col| col.slice(first_offset, entries_len))
+ .collect();
+ create_hashes(&sliced_columns, random_state, &mut values_hashes)?;
// Combine the hashes for entries on each row with each other and previous
hash for that row
+ // Adjust indices by first_offset since values_hashes is sliced starting
from first_offset
if let Some(nulls) = nulls {
for (i, (start, stop)) in
offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
if nulls.is_valid(i) {
let hash = &mut hashes_buffer[i];
- for values_hash in
&values_hashes[start.as_usize()..stop.as_usize()] {
+ for values_hash in &values_hashes
+ [start.as_usize() - first_offset..stop.as_usize() -
first_offset]
+ {
*hash = combine_hashes(*hash, *values_hash);
}
}
@@ -557,7 +573,9 @@ fn hash_map_array(
} else {
for (i, (start, stop)) in
offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
let hash = &mut hashes_buffer[i];
- for values_hash in
&values_hashes[start.as_usize()..stop.as_usize()] {
+ for values_hash in &values_hashes
+ [start.as_usize() - first_offset..stop.as_usize() -
first_offset]
+ {
*hash = combine_hashes(*hash, *values_hash);
}
}
@@ -662,14 +680,42 @@ fn hash_union_array(
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
- use std::collections::HashMap;
-
let DataType::Union(union_fields, _mode) = array.data_type() else {
unreachable!()
};
- let mut child_hashes = HashMap::with_capacity(union_fields.len());
+ if array.is_dense() {
+ // Dense union: children only contain values of their type, so they're
already compact.
+ // Use the default hashing approach which is efficient for dense
unions.
+ hash_union_array_default(array, union_fields, random_state,
hashes_buffer)
+ } else {
+ // Sparse union: each child has the same length as the union array.
+ // Optimization: only hash the elements that are actually referenced
by type_ids,
+ // instead of hashing all K*N elements (where K = num types, N = array
length).
+ hash_sparse_union_array(array, union_fields, random_state,
hashes_buffer)
+ }
+}
+
+/// Default hashing for union arrays - hashes all elements of each child array
fully.
+///
+/// This approach works for both dense and sparse union arrays:
+/// - Dense unions: children are compact (each child only contains values of
that type)
+/// - Sparse unions: children have the same length as the union array
+///
+/// For sparse unions with 3+ types, the optimized take/scatter approach in
+/// `hash_sparse_union_array` is more efficient, but for 1-2 types or dense
unions,
+/// this simpler approach is preferred.
+#[cfg(not(feature = "force_hash_collisions"))]
+fn hash_union_array_default(
+ array: &UnionArray,
+ union_fields: &UnionFields,
+ random_state: &RandomState,
+ hashes_buffer: &mut [u64],
+) -> Result<()> {
+ let mut child_hashes: HashMap<i8, Vec<u64>> =
+ HashMap::with_capacity(union_fields.len());
+ // Hash each child array fully
for (type_id, _field) in union_fields.iter() {
let child = array.child(type_id);
let mut child_hash_buffer = vec![0; child.len()];
@@ -678,6 +724,9 @@ fn hash_union_array(
child_hashes.insert(type_id, child_hash_buffer);
}
+ // Combine hashes for each row using the appropriate child offset
+ // For dense unions: value_offset points to the actual position in the
child
+ // For sparse unions: value_offset equals the row index
#[expect(clippy::needless_range_loop)]
for i in 0..array.len() {
let type_id = array.type_id(i);
@@ -690,6 +739,69 @@ fn hash_union_array(
Ok(())
}
+/// Hash a sparse union array.
+/// Sparse unions have child arrays with the same length as the union array.
+/// For 3+ types, we optimize by only hashing the N elements that are actually
used
+/// (via take/scatter), instead of hashing all K*N elements.
+///
+/// For 1-2 types, the overhead of take/scatter outweighs the benefit, so we
use
+/// the default approach of hashing all children (same as dense unions).
+#[cfg(not(feature = "force_hash_collisions"))]
+fn hash_sparse_union_array(
+ array: &UnionArray,
+ union_fields: &UnionFields,
+ random_state: &RandomState,
+ hashes_buffer: &mut [u64],
+) -> Result<()> {
+ use std::collections::HashMap;
+
+ // For 1-2 types, the take/scatter overhead isn't worth it.
+ // Fall back to the default approach (same as dense union).
+ if union_fields.len() <= 2 {
+ return hash_union_array_default(
+ array,
+ union_fields,
+ random_state,
+ hashes_buffer,
+ );
+ }
+
+ let type_ids = array.type_ids();
+
+ // Group indices by type_id
+ let mut indices_by_type: HashMap<i8, Vec<u32>> = HashMap::new();
+ for (i, &type_id) in type_ids.iter().enumerate() {
+ indices_by_type.entry(type_id).or_default().push(i as u32);
+ }
+
+ // For each type, extract only the needed elements, hash them, and scatter
back
+ for (type_id, _field) in union_fields.iter() {
+ if let Some(indices) = indices_by_type.get(&type_id) {
+ if indices.is_empty() {
+ continue;
+ }
+
+ let child = array.child(type_id);
+ let indices_array = UInt32Array::from(indices.clone());
+
+ // Extract only the elements we need using take()
+ let filtered = take(child.as_ref(), &indices_array, None)?;
+
+ // Hash the filtered array
+ let mut filtered_hashes = vec![0u64; filtered.len()];
+ create_hashes([&filtered], random_state, &mut filtered_hashes)?;
+
+ // Scatter hashes back to correct positions
+ for (hash, &idx) in filtered_hashes.iter().zip(indices.iter()) {
+ hashes_buffer[idx as usize] =
+ combine_hashes(hashes_buffer[idx as usize], *hash);
+ }
+ }
+ }
+
+ Ok(())
+}
+
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_fixed_list_array(
array: &FixedSizeListArray,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]