This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c1698b7190 [MINOR] Improve performance of `create_hashes` (#6816)
c1698b7190 is described below
commit c1698b7190bb5e3fadb6ead21856fa071c99b143
Author: Daniël Heres <[email protected]>
AuthorDate: Sun Jul 2 21:32:44 2023 +0200
[MINOR] Improve performance of `create_hashes` (#6816)
* Only rehash col >=1, specialize primitive hasher
* Fmt
* Clippy
* Typo
* Typo
* Clippy
* Add docs, assertion
* Fmt
---------
Co-authored-by: Daniël Heres <[email protected]>
---
.../core/src/physical_plan/aggregates/row_hash.rs | 1 +
datafusion/physical-expr/src/hash_utils.rs | 96 ++++++++++++++++------
2 files changed, 74 insertions(+), 23 deletions(-)
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index ba02bc096b..beb70f1b4c 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -335,6 +335,7 @@ impl GroupedHashAggregateStream {
// actually the same key value as the group in
// existing_idx (aka group_values @ row)
let group_state = &group_states[*group_idx];
+
group_rows.row(row) == group_state.group_by_values.row()
});
diff --git a/datafusion/physical-expr/src/hash_utils.rs
b/datafusion/physical-expr/src/hash_utils.rs
index b751df928d..44699285a9 100644
--- a/datafusion/physical-expr/src/hash_utils.rs
+++ b/datafusion/physical-expr/src/hash_utils.rs
@@ -84,35 +84,87 @@ macro_rules! hash_float_value {
}
hash_float_value!((half::f16, u16), (f32, u32), (f64, u64));
+/// Builds hash values of PrimitiveArray and writes them into `hashes_buffer`
+/// If `rehash==true` this combines the previous hash value in the buffer
+/// with the new hash using `combine_hashes`
+fn hash_array_primitive<T>(
+ array: &PrimitiveArray<T>,
+ random_state: &RandomState,
+ hashes_buffer: &mut [u64],
+ rehash: bool,
+) where
+ T: ArrowPrimitiveType,
+ <T as arrow_array::ArrowPrimitiveType>::Native: HashValue,
+{
+ if array.null_count() == 0 {
+ if rehash {
+ for (hash, &value) in
hashes_buffer.iter_mut().zip(array.values().iter()) {
+ *hash = combine_hashes(value.hash_one(random_state), *hash);
+ }
+ } else {
+ for (hash, &value) in
hashes_buffer.iter_mut().zip(array.values().iter()) {
+ *hash = value.hash_one(random_state);
+ }
+ }
+ } else if rehash {
+ for (i, hash) in hashes_buffer.iter_mut().enumerate() {
+ if !array.is_null(i) {
+ let value = unsafe { array.value_unchecked(i) };
+ *hash = combine_hashes(value.hash_one(random_state), *hash);
+ }
+ }
+ } else {
+ for (i, hash) in hashes_buffer.iter_mut().enumerate() {
+ if !array.is_null(i) {
+ let value = unsafe { array.value_unchecked(i) };
+ *hash = value.hash_one(random_state);
+ }
+ }
+ }
+}
+
+/// Hashes one array into the `hashes_buffer`
+/// If `rehash==true` this combines the previous hash value in the buffer
+/// with the new hash using `combine_hashes`
fn hash_array<T>(
array: T,
random_state: &RandomState,
hashes_buffer: &mut [u64],
- multi_col: bool,
+ rehash: bool,
) where
T: ArrayAccessor,
T::Item: HashValue,
{
+ assert_eq!(
+ hashes_buffer.len(),
+ array.len(),
+ "hashes_buffer and array should be of equal length"
+ );
+
if array.null_count() == 0 {
- if multi_col {
+ if rehash {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
- *hash = combine_hashes(array.value(i).hash_one(random_state),
*hash);
+ let value = unsafe { array.value_unchecked(i) };
+ *hash = combine_hashes(value.hash_one(random_state), *hash);
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
- *hash = array.value(i).hash_one(random_state);
+ let value = unsafe { array.value_unchecked(i) };
+ *hash = value.hash_one(random_state);
}
}
- } else if multi_col {
+ } else if rehash {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
- *hash = combine_hashes(array.value(i).hash_one(random_state),
*hash);
+ let value = unsafe { array.value_unchecked(i) };
+ *hash = combine_hashes(value.hash_one(random_state), *hash);
}
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
- *hash = array.value(i).hash_one(random_state);
+ let value = unsafe { array.value_unchecked(i) };
+ *hash = value.hash_one(random_state);
}
}
}
@@ -208,34 +260,32 @@ pub fn create_hashes<'a>(
random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
- // combine hashes with `combine_hashes` if we have more than 1 column
-
- let multi_col = arrays.len() > 1;
-
- for col in arrays {
+ for (i, col) in arrays.iter().enumerate() {
let array = col.as_ref();
+ // combine hashes with `combine_hashes` for all columns besides the
first
+ let rehash = i >= 1;
downcast_primitive_array! {
- array => hash_array(array, random_state, hashes_buffer, multi_col),
- DataType::Null => hash_null(random_state, hashes_buffer,
multi_col),
- DataType::Boolean => hash_array(as_boolean_array(array)?,
random_state, hashes_buffer, multi_col),
- DataType::Utf8 => hash_array(as_string_array(array)?,
random_state, hashes_buffer, multi_col),
- DataType::LargeUtf8 => hash_array(as_largestring_array(array),
random_state, hashes_buffer, multi_col),
- DataType::Binary =>
hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer,
multi_col),
- DataType::LargeBinary =>
hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer,
multi_col),
+ array => hash_array_primitive(array, random_state, hashes_buffer,
rehash),
+ DataType::Null => hash_null(random_state, hashes_buffer, rehash),
+ DataType::Boolean => hash_array(as_boolean_array(array)?,
random_state, hashes_buffer, rehash),
+ DataType::Utf8 => hash_array(as_string_array(array)?,
random_state, hashes_buffer, rehash),
+ DataType::LargeUtf8 => hash_array(as_largestring_array(array),
random_state, hashes_buffer, rehash),
+ DataType::Binary =>
hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer,
rehash),
+ DataType::LargeBinary =>
hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer,
rehash),
DataType::FixedSizeBinary(_) => {
let array: &FixedSizeBinaryArray =
array.as_any().downcast_ref().unwrap();
- hash_array(array, random_state, hashes_buffer, multi_col)
+ hash_array(array, random_state, hashes_buffer, rehash)
}
DataType::Decimal128(_, _) => {
let array = as_primitive_array::<Decimal128Type>(array)?;
- hash_array(array, random_state, hashes_buffer, multi_col)
+ hash_array_primitive(array, random_state, hashes_buffer,
rehash)
}
DataType::Decimal256(_, _) => {
let array = as_primitive_array::<Decimal256Type>(array)?;
- hash_array(array, random_state, hashes_buffer, multi_col)
+ hash_array_primitive(array, random_state, hashes_buffer,
rehash)
}
DataType::Dictionary(_, _) => downcast_dictionary_array! {
- array => hash_dictionary(array, random_state, hashes_buffer,
multi_col)?,
+ array => hash_dictionary(array, random_state, hashes_buffer,
rehash)?,
_ => unreachable!()
}
_ => {