alamb commented on code in PR #18449:
URL: https://github.com/apache/datafusion/pull/18449#discussion_r2534248997
##########
datafusion/common/src/hash_utils.rs:
##########
@@ -1000,4 +1088,84 @@ mod tests {
assert_eq!(hashes1, hashes2);
}
+
+ #[test]
+ fn test_with_hashes() {
+ let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+
+ // Test that with_hashes produces the same results as create_hashes
+ let mut expected_hashes = vec![0; array.len()];
+ create_hashes([&array], &random_state, &mut expected_hashes).unwrap();
+
+ let result = with_hashes([&array], &random_state, |hashes| {
+ assert_eq!(hashes.len(), 4);
+ // Verify hashes match expected values
+ assert_eq!(hashes, &expected_hashes[..]);
+ // Return a copy of the hashes
+ Ok(hashes.to_vec())
+ })
+ .unwrap();
+
+ // Verify callback result is returned correctly
+ assert_eq!(result, expected_hashes);
+ }
+
+ #[test]
+ fn test_with_hashes_multi_column() {
+ let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let str_array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b",
"c"]));
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+
+ // Test multi-column hashing
+ let mut expected_hashes = vec![0; int_array.len()];
+ create_hashes(
+ [&int_array, &str_array],
+ &random_state,
+ &mut expected_hashes,
+ )
+ .unwrap();
+
+ with_hashes([&int_array, &str_array], &random_state, |hashes| {
+ assert_eq!(hashes.len(), 3);
+ assert_eq!(hashes, &expected_hashes[..]);
+ Ok(())
+ })
+ .unwrap();
+ }
+
+ #[test]
+ fn test_with_hashes_empty_arrays() {
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+
+ // Test that passing no arrays returns an error
+ let empty: [&ArrayRef; 0] = [];
+ let result = with_hashes(empty, &random_state, |_hashes| Ok(()));
+
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("requires at least one array"));
+ }
+
+ #[test]
+ fn test_with_hashes_reentrancy() {
Review Comment:
Can you please add a test / verify the truncate / shrink to fit behavior ? I
think that is probably important
##########
datafusion/common/src/hash_utils.rs:
##########
@@ -478,8 +566,8 @@ impl AsDynArray for &ArrayRef {
pub fn create_hashes<'a, I, T>(
arrays: I,
random_state: &RandomState,
- hashes_buffer: &'a mut Vec<u64>,
-) -> Result<&'a mut Vec<u64>>
+ hashes_buffer: &'a mut [u64],
Review Comment:
I double checked that the code already asserts that the hashes_buffer and
arrays are the same length (aka doesn't actually use the fact this is a Vec to
grow the allocation)
##########
datafusion/sqllogictest/test_files/expr.slt:
##########
@@ -1066,6 +1066,213 @@ SELECT '2' NOT IN ('a','b',NULL,1)
----
NULL
+# ========================================================================
+# Comprehensive IN LIST tests with NULL handling
Review Comment:
I always why @claude and similar bots always insist the code is
"comprehensive" 😆
##########
datafusion/physical-expr/src/expressions/in_list.rs:
##########
@@ -80,66 +75,56 @@ struct ArrayHashSet {
map: HashMap<usize, (), ()>,
}
-struct ArraySet<T> {
- array: T,
- hash_set: ArrayHashSet,
-}
-
-impl<T> ArraySet<T>
-where
- T: Array + From<ArrayData>,
-{
- fn new(array: &T, hash_set: ArrayHashSet) -> Self {
- Self {
- array: downcast_array(array),
- hash_set,
+impl ArrayHashSet {
+ /// Checks if values in `v` are contained in the `in_array` using this
hash set for lookup.
+ fn contains(
+ &self,
+ v: &dyn Array,
+ in_array: &dyn Array,
+ negated: bool,
+ ) -> Result<BooleanArray> {
+ // Null type comparisons always return null (SQL three-valued logic)
+ if v.data_type() == &DataType::Null || in_array.data_type() ==
&DataType::Null {
+ return Ok(BooleanArray::from(vec![None; v.len()]));
}
- }
-}
-impl<T> Set for ArraySet<T>
-where
- T: Array + 'static,
- for<'a> &'a T: ArrayAccessor,
- for<'a> <&'a T as ArrayAccessor>::Item: IsEqual,
-{
- fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
downcast_dictionary_array! {
v => {
- let values_contains = self.contains(v.values().as_ref(),
negated)?;
+ let values_contains = self.contains(v.values().as_ref(),
in_array, negated)?;
let result = take(&values_contains, v.keys(), None)?;
return Ok(downcast_array(result.as_ref()))
}
_ => {}
}
- let v = v.as_any().downcast_ref::<T>().unwrap();
- let in_array = &self.array;
- let has_nulls = in_array.null_count() != 0;
+ let needle_nulls = v.logical_nulls();
+ let needle_nulls = needle_nulls.as_ref();
+ let haystack_has_nulls = in_array.null_count() != 0;
+
+ with_hashes([v], &self.state, |hashes| {
+ let cmp = make_comparator(v, in_array, SortOptions::default())?;
+ Ok((0..v.len())
+ .map(|i| {
+ // SQL three-valued logic: null IN (...) is always null
+ if needle_nulls.is_some_and(|nulls| nulls.is_null(i)) {
+ return None;
+ }
- Ok(ArrayIter::new(v)
- .map(|v| {
- v.and_then(|v| {
- let hash = v.hash_one(&self.hash_set.state);
+ let hash = hashes[i];
let contains = self
- .hash_set
.map
.raw_entry()
- .from_hash(hash, |idx|
in_array.value(*idx).is_equal(&v))
+ .from_hash(hash, |idx| cmp(i, *idx).is_eq())
Review Comment:
If we ever need to make this faster, we could potentially add
specializations for different primitive types, and still fall back to the
dynamic comparator
##########
datafusion/physical-expr/src/expressions/in_list.rs:
##########
@@ -25,34 +25,34 @@ use std::sync::Arc;
use crate::physical_expr::physical_exprs_bag_equal;
use crate::PhysicalExpr;
-use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
use arrow::array::*;
use arrow::buffer::BooleanBuffer;
use arrow::compute::kernels::boolean::{not, or_kleene};
-use arrow::compute::take;
+use arrow::compute::{take, SortOptions};
use arrow::datatypes::*;
+use arrow::downcast_dictionary_array;
use arrow::util::bit_iterator::BitIndexIterator;
-use arrow::{downcast_dictionary_array, downcast_primitive_array};
-use datafusion_common::cast::{
- as_boolean_array, as_generic_binary_array, as_string_array,
-};
-use datafusion_common::hash_utils::HashValue;
-use datafusion_common::{
- exec_err, internal_err, not_impl_err, DFSchema, Result, ScalarValue,
-};
-use datafusion_expr::ColumnarValue;
-use datafusion_physical_expr_common::datum::compare_with_eq;
+use datafusion_common::hash_utils::with_hashes;
+use datafusion_common::{exec_err, internal_err, DFSchema, Result, ScalarValue};
+use datafusion_expr::{expr_vec_fmt, ColumnarValue};
use ahash::RandomState;
use datafusion_common::HashMap;
use hashbrown::hash_map::RawEntryMut;
+/// Static filter for InList that stores the array and hash set for O(1)
lookups
+#[derive(Debug, Clone)]
Review Comment:
What is the reason to pull StaticFilter out from ArrayHashSet? It took me a
little bit to grok that the fields in ArrayHashSet refer to StaticFilter
In other words, why not something like
```rust
/// Static filter for InList that stores the array and hash set for O(1)
lookups
#[derive(Debug, Clone)]
struct StaticFilter {
in_array: ArrayRef,
state: RandomState,
/// Used to provide a lookup from value to in list index
///
/// Note: usize::hash is not used, instead the raw entry
/// API is used to store entries w.r.t their value
map: HashMap<usize, (), ()>,
}
```
##########
datafusion/physical-expr/src/expressions/in_list.rs:
##########
@@ -25,34 +25,34 @@ use std::sync::Arc;
use crate::physical_expr::physical_exprs_bag_equal;
use crate::PhysicalExpr;
-use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
use arrow::array::*;
use arrow::buffer::BooleanBuffer;
use arrow::compute::kernels::boolean::{not, or_kleene};
-use arrow::compute::take;
+use arrow::compute::{take, SortOptions};
use arrow::datatypes::*;
+use arrow::downcast_dictionary_array;
use arrow::util::bit_iterator::BitIndexIterator;
-use arrow::{downcast_dictionary_array, downcast_primitive_array};
-use datafusion_common::cast::{
- as_boolean_array, as_generic_binary_array, as_string_array,
-};
-use datafusion_common::hash_utils::HashValue;
-use datafusion_common::{
- exec_err, internal_err, not_impl_err, DFSchema, Result, ScalarValue,
-};
-use datafusion_expr::ColumnarValue;
-use datafusion_physical_expr_common::datum::compare_with_eq;
+use datafusion_common::hash_utils::with_hashes;
+use datafusion_common::{exec_err, internal_err, DFSchema, Result, ScalarValue};
+use datafusion_expr::{expr_vec_fmt, ColumnarValue};
use ahash::RandomState;
use datafusion_common::HashMap;
use hashbrown::hash_map::RawEntryMut;
+/// Static filter for InList that stores the array and hash set for O(1)
lookups
+#[derive(Debug, Clone)]
Review Comment:
Update -- I tried this and it seems to work great
- https://github.com/pydantic/datafusion/pull/44
##########
datafusion/physical-expr/src/expressions/in_list.rs:
##########
@@ -366,18 +315,90 @@ impl PhysicalExpr for InListExpr {
let num_rows = batch.num_rows();
let value = self.expr.evaluate(batch)?;
let r = match &self.static_filter {
- Some(f) => f.contains(value.into_array(num_rows)?.as_ref(),
self.negated)?,
+ Some(filter) => {
+ match value {
+ ColumnarValue::Array(array) => filter.hash_set.contains(
+ &array,
+ filter.array.as_ref(),
+ self.negated,
+ )?,
+ ColumnarValue::Scalar(scalar) => {
+ if scalar.is_null() {
+ // SQL three-valued logic: null IN (...) is always
null
+ // The code below would handle this correctly but
this is a faster path
+ return Ok(ColumnarValue::Array(Arc::new(
+ BooleanArray::from(vec![None; num_rows]),
Review Comment:
This can probably be made faster by bypassing the Vec entirely -- perhaps
via https://docs.rs/arrow/latest/arrow/array/struct.BooleanBufferBuilder.html
not necessary, I am just pointing it out
##########
datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part:
##########
@@ -69,13 +69,13 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[],
aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
05)--------CoalesceBatchesExec: target_batch_size=8192
-06)----------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(l_partkey@0, p_partkey@0)], filter=p_brand@1 = Brand#12 AND p_container@3
IN ([SM CASE, SM BOX, SM PACK, SM PKG]) AND l_quantity@0 >= Some(100),15,2 AND
l_quantity@0 <= Some(1100),15,2 AND p_size@2 <= 5 OR p_brand@1 = Brand#23 AND
p_container@3 IN ([MED BAG, MED BOX, MED PKG, MED PACK]) AND l_quantity@0 >=
Some(1000),15,2 AND l_quantity@0 <= Some(2000),15,2 AND p_size@2 <= 10 OR
p_brand@1 = Brand#34 AND p_container@3 IN ([LG CASE, LG BOX, LG PACK, LG PKG])
AND l_quantity@0 >= Some(2000),15,2 AND l_quantity@0 <= Some(3000),15,2 AND
p_size@2 <= 15, projection=[l_extendedprice@2, l_discount@3]
+06)----------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(l_partkey@0, p_partkey@0)], filter=p_brand@1 = Brand#12 AND p_container@3
IN (SET) ([SM CASE, SM BOX, SM PACK, SM PKG]) AND l_quantity@0 >=
Some(100),15,2 AND l_quantity@0 <= Some(1100),15,2 AND p_size@2 <= 5 OR
p_brand@1 = Brand#23 AND p_container@3 IN (SET) ([MED BAG, MED BOX, MED PKG,
MED PACK]) AND l_quantity@0 >= Some(1000),15,2 AND l_quantity@0 <=
Some(2000),15,2 AND p_size@2 <= 10 OR p_brand@1 = Brand#34 AND p_container@3 IN
(SET) ([LG CASE, LG BOX, LG PACK, LG PKG]) AND l_quantity@0 >= Some(2000),15,2
AND l_quantity@0 <= Some(3000),15,2 AND p_size@2 <= 15,
projection=[l_extendedprice@2, l_discount@3]
Review Comment:
Why did these queries start being able to use the pre-calculated set? Is it
because InList didn't have a special case for Utf8View before?
##########
datafusion/physical-expr/src/expressions/in_list.rs:
##########
@@ -366,18 +315,90 @@ impl PhysicalExpr for InListExpr {
let num_rows = batch.num_rows();
let value = self.expr.evaluate(batch)?;
let r = match &self.static_filter {
- Some(f) => f.contains(value.into_array(num_rows)?.as_ref(),
self.negated)?,
+ Some(filter) => {
+ match value {
+ ColumnarValue::Array(array) => filter.hash_set.contains(
+ &array,
+ filter.array.as_ref(),
+ self.negated,
+ )?,
+ ColumnarValue::Scalar(scalar) => {
+ if scalar.is_null() {
+ // SQL three-valued logic: null IN (...) is always
null
+ // The code below would handle this correctly but
this is a faster path
+ return Ok(ColumnarValue::Array(Arc::new(
+ BooleanArray::from(vec![None; num_rows]),
+ )));
+ }
+ // Use a 1 row array to avoid code
duplication/branching
+ // Since all we do is compute hash and lookup this
should be efficient enough
+ let array = scalar.to_array()?;
+ let result_array = filter.hash_set.contains(
+ array.as_ref(),
+ filter.array.as_ref(),
+ self.negated,
+ )?;
+ // Broadcast the single result to all rows
+ // Must check is_null() to preserve NULL values (SQL
three-valued logic)
+ if result_array.is_null(0) {
+ BooleanArray::from(vec![None; num_rows])
+ } else {
+ BooleanArray::from_iter(std::iter::repeat_n(
+ result_array.value(0),
+ num_rows,
+ ))
+ }
+ }
+ }
+ }
None => {
+ // No static filter: iterate through each expression, compare,
and OR results
let value = value.into_array(num_rows)?;
- let is_nested = value.data_type().is_nested();
let found = self.list.iter().map(|expr|
expr.evaluate(batch)).try_fold(
BooleanArray::new(BooleanBuffer::new_unset(num_rows),
None),
|result, expr| -> Result<BooleanArray> {
- let rhs = compare_with_eq(
- &value,
- &expr?.into_array(num_rows)?,
- is_nested,
- )?;
+ let rhs = match expr? {
+ ColumnarValue::Array(array) => {
+ let cmp = make_comparator(
+ value.as_ref(),
+ array.as_ref(),
+ SortOptions::default(),
+ )?;
+ (0..num_rows)
+ .map(|i| {
+ if value.is_null(i) ||
array.is_null(i) {
+ return None;
+ }
+ Some(cmp(i, i).is_eq())
+ })
+ .collect::<BooleanArray>()
+ }
+ ColumnarValue::Scalar(scalar) => {
+ // Check if scalar is null once, before the
loop
+ if scalar.is_null() {
+ // If scalar is null, all comparisons
return null
+ BooleanArray::from(vec![None; num_rows])
+ } else {
+ // Convert scalar to 1-element array
+ let array = scalar.to_array()?;
Review Comment:
I am really surprised using this comparator does not cause a
performance_regression compared to using `eq`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]