This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b6832d50f Hash binary values (#3098)
b6832d50f is described below
commit b6832d50fd0bcd3dd428a6ba0da90ff3e58f85ed
Author: Daniƫl Heres <[email protected]>
AuthorDate: Fri Aug 12 12:11:04 2022 +0200
Hash binary values (#3098)
* Hash binary values
* Large binary impl
---
datafusion/core/src/physical_plan/hash_utils.rs | 53 ++++++++++++++++++++++---
datafusion/sql/src/planner.rs | 2 +
2 files changed, 49 insertions(+), 6 deletions(-)
diff --git a/datafusion/core/src/physical_plan/hash_utils.rs
b/datafusion/core/src/physical_plan/hash_utils.rs
index a89247d7e..ba6dda51d 100644
--- a/datafusion/core/src/physical_plan/hash_utils.rs
+++ b/datafusion/core/src/physical_plan/hash_utils.rs
@@ -91,19 +91,19 @@ fn hash_decimal128<'a>(
}
macro_rules! hash_array {
- ($array_type:ident, $column: ident, $ty: ident, $hashes: ident,
$random_state: ident, $multi_col: ident) => {
+ ($array_type:ident, $column: ident, $ty: ty, $hashes: ident,
$random_state: ident, $multi_col: ident) => {
let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
if array.null_count() == 0 {
if $multi_col {
for (i, hash) in $hashes.iter_mut().enumerate() {
*hash = combine_hashes(
- $ty::get_hash(&array.value(i), $random_state),
+ <$ty>::get_hash(&array.value(i), $random_state),
*hash,
);
}
} else {
for (i, hash) in $hashes.iter_mut().enumerate() {
- *hash = $ty::get_hash(&array.value(i), $random_state);
+ *hash = <$ty>::get_hash(&array.value(i), $random_state);
}
}
} else {
@@ -111,7 +111,7 @@ macro_rules! hash_array {
for (i, hash) in $hashes.iter_mut().enumerate() {
if !array.is_null(i) {
*hash = combine_hashes(
- $ty::get_hash(&array.value(i), $random_state),
+ <$ty>::get_hash(&array.value(i), $random_state),
*hash,
);
}
@@ -119,7 +119,7 @@ macro_rules! hash_array {
} else {
for (i, hash) in $hashes.iter_mut().enumerate() {
if !array.is_null(i) {
- *hash = $ty::get_hash(&array.value(i), $random_state);
+ *hash = <$ty>::get_hash(&array.value(i),
$random_state);
}
}
}
@@ -329,6 +329,8 @@ pub fn create_hashes<'a>(
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
// combine hashes with `combine_hashes` if we have more than 1 column
+
+ use arrow::array::{BinaryArray, LargeBinaryArray};
let multi_col = arrays.len() > 1;
for col in arrays {
@@ -529,6 +531,26 @@ pub fn create_hashes<'a>(
multi_col
);
}
+ DataType::Binary => {
+ hash_array!(
+ BinaryArray,
+ col,
+ &[u8],
+ hashes_buffer,
+ random_state,
+ multi_col
+ );
+ }
+ DataType::LargeBinary => {
+ hash_array!(
+ LargeBinaryArray,
+ col,
+ &[u8],
+ hashes_buffer,
+ random_state,
+ multi_col
+ );
+ }
DataType::Dictionary(index_type, _) => match **index_type {
DataType::Int8 => {
create_hashes_dictionary::<Int8Type>(
@@ -616,7 +638,10 @@ pub fn create_hashes<'a>(
#[cfg(test)]
mod tests {
use crate::from_slice::FromSlice;
- use arrow::{array::DictionaryArray, datatypes::Int8Type};
+ use arrow::{
+ array::{BinaryArray, DictionaryArray},
+ datatypes::Int8Type,
+ };
use std::sync::Arc;
use super::*;
@@ -653,6 +678,22 @@ mod tests {
Ok(())
}
+ #[test]
+ fn create_hashes_binary() -> Result<()> {
+ let byte_array = Arc::new(BinaryArray::from_vec(vec![
+ &[4, 3, 2],
+ &[4, 3, 2],
+ &[1, 2, 3],
+ ]));
+
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+ let hashes_buff = &mut vec![0; byte_array.len()];
+ let hashes = create_hashes(&[byte_array], &random_state, hashes_buff)?;
+ assert_eq!(hashes.len(), 3,);
+
+ Ok(())
+ }
+
#[test]
// Tests actual values of hashes, which are different if forcing collisions
#[cfg(not(feature = "force_hash_collisions"))]
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 22a9b1f0e..daa4753e4 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -2562,6 +2562,8 @@ pub fn convert_simple_data_type(sql_type: &SQLDataType)
-> Result<DataType> {
SQLDataType::Timestamp => Ok(DataType::Timestamp(TimeUnit::Nanosecond,
None)),
SQLDataType::Date => Ok(DataType::Date32),
SQLDataType::Decimal(precision, scale) =>
make_decimal_type(*precision, *scale),
+ SQLDataType::Binary(_) => Ok(DataType::Binary),
+ SQLDataType::Bytea => Ok(DataType::Binary),
other => Err(DataFusionError::NotImplemented(format!(
"Unsupported SQL type {:?}",
other