This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 068f96f049 Hash UnionArrays (#18718)
068f96f049 is described below
commit 068f96f04906dac223246d6f3608196f194b864f
Author: Matthew Kim <[email protected]>
AuthorDate: Wed Nov 19 21:14:08 2025 -0500
Hash UnionArrays (#18718)
## Which issue does this PR close?
- Closes https://github.com/apache/datafusion/issues/18717
## Rationale for this change
This PR adds hash support for Union data types, enabling group by,
distinct, hash joins, and aggregations on union-typed columns
`hash_union_array` hashes each child array once. Then for every row, it
uses the type id and offset to retrieve the appropriate hash value
---
datafusion/common/src/hash_utils.rs | 155 +++++++++++++++++++++++++++++++++++-
1 file changed, 154 insertions(+), 1 deletion(-)
diff --git a/datafusion/common/src/hash_utils.rs
b/datafusion/common/src/hash_utils.rs
index 0fa47671d3..c6bd53356f 100644
--- a/datafusion/common/src/hash_utils.rs
+++ b/datafusion/common/src/hash_utils.rs
@@ -28,7 +28,7 @@ use arrow::{downcast_dictionary_array,
downcast_primitive_array};
use crate::cast::{
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
as_generic_binary_array, as_large_list_array, as_list_array, as_map_array,
- as_string_array, as_string_view_array, as_struct_array,
+ as_string_array, as_string_view_array, as_struct_array, as_union_array,
};
use crate::error::Result;
use crate::error::{_internal_datafusion_err, _internal_err};
@@ -417,6 +417,40 @@ where
Ok(())
}
+#[cfg(not(feature = "force_hash_collisions"))]
+fn hash_union_array(
+ array: &UnionArray,
+ random_state: &RandomState,
+ hashes_buffer: &mut [u64],
+) -> Result<()> {
+ use std::collections::HashMap;
+
+ let DataType::Union(union_fields, _mode) = array.data_type() else {
+ unreachable!()
+ };
+
+ let mut child_hashes = HashMap::with_capacity(union_fields.len());
+
+ for (type_id, _field) in union_fields.iter() {
+ let child = array.child(type_id);
+ let mut child_hash_buffer = vec![0; child.len()];
+ create_hashes([child], random_state, &mut child_hash_buffer)?;
+
+ child_hashes.insert(type_id, child_hash_buffer);
+ }
+
+ #[expect(clippy::needless_range_loop)]
+ for i in 0..array.len() {
+ let type_id = array.type_id(i);
+ let child_offset = array.value_offset(i);
+
+ let child_hash = child_hashes.get(&type_id).expect("invalid type_id");
+ hashes_buffer[i] = combine_hashes(hashes_buffer[i],
child_hash[child_offset]);
+ }
+
+ Ok(())
+}
+
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_fixed_list_array(
array: &FixedSizeListArray,
@@ -497,6 +531,10 @@ fn hash_single_array(
let array = as_fixed_size_list_array(array)?;
hash_fixed_list_array(array, random_state, hashes_buffer)?;
}
+ DataType::Union(_, _) => {
+ let array = as_union_array(array)?;
+ hash_union_array(array, random_state, hashes_buffer)?;
+ }
_ => {
// This is internal because we should have caught this before.
return _internal_err!(
@@ -1168,4 +1206,119 @@ mod tests {
"Error message should mention reentrancy: {err_msg}",
);
}
+
+ #[test]
+ #[cfg(not(feature = "force_hash_collisions"))]
+ fn create_hashes_for_sparse_union_arrays() {
+ // logical array: [int(5), str("foo"), int(10), int(5)]
+ let int_array = Int32Array::from(vec![Some(5), None, Some(10),
Some(5)]);
+ let str_array = StringArray::from(vec![None, Some("foo"), None, None]);
+
+ let type_ids = vec![0_i8, 1, 0, 0].into();
+ let children = vec![
+ Arc::new(int_array) as ArrayRef,
+ Arc::new(str_array) as ArrayRef,
+ ];
+
+ let union_fields = [
+ (0, Arc::new(Field::new("a", DataType::Int32, true))),
+ (1, Arc::new(Field::new("b", DataType::Utf8, true))),
+ ]
+ .into_iter()
+ .collect();
+
+ let array = UnionArray::try_new(union_fields, type_ids, None,
children).unwrap();
+ let array_ref = Arc::new(array) as ArrayRef;
+
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+ let mut hashes = vec![0; array_ref.len()];
+ create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
+
+ // Rows 0 and 3 both have type_id=0 (int) with value 5
+ assert_eq!(hashes[0], hashes[3]);
+ // Row 0 (int 5) vs Row 2 (int 10) - different values
+ assert_ne!(hashes[0], hashes[2]);
+ // Row 0 (int) vs Row 1 (string) - different types
+ assert_ne!(hashes[0], hashes[1]);
+ }
+
+ #[test]
+ #[cfg(not(feature = "force_hash_collisions"))]
+ fn create_hashes_for_sparse_union_arrays_with_nulls() {
+ // logical array: [int(5), str("foo"), int(null), str(null)]
+ let int_array = Int32Array::from(vec![Some(5), None, None, None]);
+ let str_array = StringArray::from(vec![None, Some("foo"), None, None]);
+
+ let type_ids = vec![0, 1, 0, 1].into();
+ let children = vec![
+ Arc::new(int_array) as ArrayRef,
+ Arc::new(str_array) as ArrayRef,
+ ];
+
+ let union_fields = [
+ (0, Arc::new(Field::new("a", DataType::Int32, true))),
+ (1, Arc::new(Field::new("b", DataType::Utf8, true))),
+ ]
+ .into_iter()
+ .collect();
+
+ let array = UnionArray::try_new(union_fields, type_ids, None,
children).unwrap();
+ let array_ref = Arc::new(array) as ArrayRef;
+
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+ let mut hashes = vec![0; array_ref.len()];
+ create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
+
+ // row 2 (int null) and row 3 (str null) should have the same hash
+ // because they are both null values
+ assert_eq!(hashes[2], hashes[3]);
+
+ // row 0 (int 5) vs row 2 (int null) - different (value vs null)
+ assert_ne!(hashes[0], hashes[2]);
+
+ // row 1 (str "foo") vs row 3 (str null) - different (value vs null)
+ assert_ne!(hashes[1], hashes[3]);
+ }
+
+ #[test]
+ #[cfg(not(feature = "force_hash_collisions"))]
+ fn create_hashes_for_dense_union_arrays() {
+ // creates a dense union array with int and string types
+ // [67, "norm", 100, "macdonald", 67]
+ let int_array = Int32Array::from(vec![67, 100, 67]);
+ let str_array = StringArray::from(vec!["norm", "macdonald"]);
+
+ let type_ids = vec![0, 1, 0, 1, 0].into();
+ let offsets = vec![0, 0, 1, 1, 2].into();
+ let children = vec![
+ Arc::new(int_array) as ArrayRef,
+ Arc::new(str_array) as ArrayRef,
+ ];
+
+ let union_fields = [
+ (0, Arc::new(Field::new("a", DataType::Int32, false))),
+ (1, Arc::new(Field::new("b", DataType::Utf8, false))),
+ ]
+ .into_iter()
+ .collect();
+
+ let array =
+ UnionArray::try_new(union_fields, type_ids, Some(offsets),
children).unwrap();
+ let array_ref = Arc::new(array) as ArrayRef;
+
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+ let mut hashes = vec![0; array_ref.len()];
+ create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
+
+ // 67 vs "norm"
+ assert_ne!(hashes[0], hashes[1]);
+ // 67 vs 100
+ assert_ne!(hashes[0], hashes[2]);
+ // "norm" vs "macdonald"
+ assert_ne!(hashes[1], hashes[3]);
+ // 100 vs "macdonald"
+ assert_ne!(hashes[2], hashes[3]);
+ // 67 vs 67
+ assert_eq!(hashes[0], hashes[4]);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]