This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fb5ff9935 Fix join on arrays of unhashable types and allow hash join 
on all types supported at run-time (#13388)
9fb5ff9935 is described below

commit 9fb5ff99350cea8d360a0519ad9abb8046770973
Author: Piotr Findeisen <[email protected]>
AuthorDate: Tue Nov 19 13:01:58 2024 +0100

    Fix join on arrays of unhashable types and allow hash join on all types 
supported at run-time (#13388)
    
    * Remove unused code paths from create_hashes
    
    The `downcast_primitive_array!` macro handles all primitive types
    and only then delegates to fallbacks. It handles Decimal128 and
    Decimal256 internally.
    
    * Fix join on arrays of unhashable types and allow hash join on all types 
supported at run-time #13388
    
    Update can_hash to match currently supported hashes.
    
    * Rename table_with_many_types in tests
    
    * Test join on binary is hash join
---
 datafusion/common/src/hash_utils.rs                | 10 +---
 datafusion/expr/src/utils.rs                       | 59 +++++++++++++++-------
 datafusion/sqllogictest/src/test_context.rs        |  9 +++-
 .../test_files/information_schema_columns.slt      | 16 +++---
 datafusion/sqllogictest/test_files/joins.slt       | 21 ++++++++
 5 files changed, 79 insertions(+), 36 deletions(-)

diff --git a/datafusion/common/src/hash_utils.rs 
b/datafusion/common/src/hash_utils.rs
index 8bd646626e..e18d70844d 100644
--- a/datafusion/common/src/hash_utils.rs
+++ b/datafusion/common/src/hash_utils.rs
@@ -32,7 +32,7 @@ use arrow_buffer::IntervalMonthDayNano;
 use crate::cast::{
     as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
     as_generic_binary_array, as_large_list_array, as_list_array, as_map_array,
-    as_primitive_array, as_string_array, as_string_view_array, as_struct_array,
+    as_string_array, as_string_view_array, as_struct_array,
 };
 use crate::error::Result;
 #[cfg(not(feature = "force_hash_collisions"))]
@@ -392,14 +392,6 @@ pub fn create_hashes<'a>(
                 let array: &FixedSizeBinaryArray = 
array.as_any().downcast_ref().unwrap();
                 hash_array(array, random_state, hashes_buffer, rehash)
             }
-            DataType::Decimal128(_, _) => {
-                let array = as_primitive_array::<Decimal128Type>(array)?;
-                hash_array_primitive(array, random_state, hashes_buffer, 
rehash)
-            }
-            DataType::Decimal256(_, _) => {
-                let array = as_primitive_array::<Decimal256Type>(array)?;
-                hash_array_primitive(array, random_state, hashes_buffer, 
rehash)
-            }
             DataType::Dictionary(_, _) => downcast_dictionary_array! {
                 array => hash_dictionary(array, random_state, hashes_buffer, 
rehash)?,
                 _ => unreachable!()
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index c22ee244fe..6f7c5d3792 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -29,7 +29,7 @@ use crate::{
 };
 use datafusion_expr_common::signature::{Signature, TypeSignature};
 
-use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
+use arrow::datatypes::{DataType, Field, Schema};
 use datafusion_common::tree_node::{
     Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
 };
@@ -958,7 +958,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr(
 
 /// Can this data type be used in hash join equal conditions??
 /// Data types here come from function 'equal_rows', if more data types are 
supported
-/// in equal_rows(hash join), add those data types here to generate join 
logical plan.
+/// in create_hashes, add those data types here to generate join logical plan.
 pub fn can_hash(data_type: &DataType) -> bool {
     match data_type {
         DataType::Null => true,
@@ -971,31 +971,38 @@ pub fn can_hash(data_type: &DataType) -> bool {
         DataType::UInt16 => true,
         DataType::UInt32 => true,
         DataType::UInt64 => true,
+        DataType::Float16 => true,
         DataType::Float32 => true,
         DataType::Float64 => true,
-        DataType::Timestamp(time_unit, _) => match time_unit {
-            TimeUnit::Second => true,
-            TimeUnit::Millisecond => true,
-            TimeUnit::Microsecond => true,
-            TimeUnit::Nanosecond => true,
-        },
+        DataType::Decimal128(_, _) => true,
+        DataType::Decimal256(_, _) => true,
+        DataType::Timestamp(_, _) => true,
         DataType::Utf8 => true,
         DataType::LargeUtf8 => true,
         DataType::Utf8View => true,
-        DataType::Decimal128(_, _) => true,
+        DataType::Binary => true,
+        DataType::LargeBinary => true,
+        DataType::BinaryView => true,
         DataType::Date32 => true,
         DataType::Date64 => true,
+        DataType::Time32(_) => true,
+        DataType::Time64(_) => true,
+        DataType::Duration(_) => true,
+        DataType::Interval(_) => true,
         DataType::FixedSizeBinary(_) => true,
-        DataType::Dictionary(key_type, value_type)
-            if *value_type.as_ref() == DataType::Utf8 =>
-        {
-            DataType::is_dictionary_key_type(key_type)
+        DataType::Dictionary(key_type, value_type) => {
+            DataType::is_dictionary_key_type(key_type) && can_hash(value_type)
         }
-        DataType::List(_) => true,
-        DataType::LargeList(_) => true,
-        DataType::FixedSizeList(_, _) => true,
+        DataType::List(value_type) => can_hash(value_type.data_type()),
+        DataType::LargeList(value_type) => can_hash(value_type.data_type()),
+        DataType::FixedSizeList(value_type, _) => 
can_hash(value_type.data_type()),
+        DataType::Map(map_struct, true | false) => 
can_hash(map_struct.data_type()),
         DataType::Struct(fields) => fields.iter().all(|f| 
can_hash(f.data_type())),
-        _ => false,
+
+        DataType::ListView(_)
+        | DataType::LargeListView(_)
+        | DataType::Union(_, _)
+        | DataType::RunEndEncoded(_, _) => false,
     }
 }
 
@@ -1403,6 +1410,7 @@ mod tests {
         test::function_stub::max_udaf, test::function_stub::min_udaf,
         test::function_stub::sum_udaf, Cast, ExprFunctionExt, 
WindowFunctionDefinition,
     };
+    use arrow::datatypes::{UnionFields, UnionMode};
 
     #[test]
     fn test_group_window_expr_by_sort_keys_empty_case() -> Result<()> {
@@ -1805,4 +1813,21 @@ mod tests {
         assert!(accum.contains(&Column::from_name("a")));
         Ok(())
     }
+
+    #[test]
+    fn test_can_hash() {
+        let union_fields: UnionFields = [
+            (0, Arc::new(Field::new("A", DataType::Int32, true))),
+            (1, Arc::new(Field::new("B", DataType::Float64, true))),
+        ]
+        .into_iter()
+        .collect();
+
+        let union_type = DataType::Union(union_fields, UnionMode::Sparse);
+        assert!(!can_hash(&union_type));
+
+        let list_union_type =
+            DataType::List(Arc::new(Field::new("my_union", union_type, true)));
+        assert!(!can_hash(&list_union_type));
+    }
 }
diff --git a/datafusion/sqllogictest/src/test_context.rs 
b/datafusion/sqllogictest/src/test_context.rs
index 477f225443..2466303c32 100644
--- a/datafusion/sqllogictest/src/test_context.rs
+++ b/datafusion/sqllogictest/src/test_context.rs
@@ -106,6 +106,8 @@ impl TestContext {
                 let example_udf = create_example_udf();
                 test_ctx.ctx.register_udf(example_udf);
                 register_partition_table(&mut test_ctx).await;
+                info!("Registering table with many types");
+                register_table_with_many_types(test_ctx.session_ctx()).await;
             }
             "metadata.slt" => {
                 info!("Registering metadata table tables");
@@ -251,8 +253,11 @@ pub async fn register_table_with_many_types(ctx: 
&SessionContext) {
         .unwrap();
     ctx.register_catalog("my_catalog", Arc::new(catalog));
 
-    ctx.register_table("my_catalog.my_schema.t2", table_with_many_types())
-        .unwrap();
+    ctx.register_table(
+        "my_catalog.my_schema.table_with_many_types",
+        table_with_many_types(),
+    )
+    .unwrap();
 }
 
 pub async fn register_table_with_map(ctx: &SessionContext) {
diff --git a/datafusion/sqllogictest/test_files/information_schema_columns.slt 
b/datafusion/sqllogictest/test_files/information_schema_columns.slt
index 7cf845c16d..d348a764fa 100644
--- a/datafusion/sqllogictest/test_files/information_schema_columns.slt
+++ b/datafusion/sqllogictest/test_files/information_schema_columns.slt
@@ -37,17 +37,17 @@ query TTTTITTTIIIIIIT rowsort
 SELECT * from information_schema.columns;
 ----
 my_catalog my_schema t1 i 0 NULL YES Int32 NULL NULL 32 2 NULL NULL NULL
-my_catalog my_schema t2 binary_col 4 NULL NO Binary NULL 2147483647 NULL NULL 
NULL NULL NULL
-my_catalog my_schema t2 float64_col 1 NULL YES Float64 NULL NULL 24 2 NULL 
NULL NULL
-my_catalog my_schema t2 int32_col 0 NULL NO Int32 NULL NULL 32 2 NULL NULL NULL
-my_catalog my_schema t2 large_binary_col 5 NULL NO LargeBinary NULL 
9223372036854775807 NULL NULL NULL NULL NULL
-my_catalog my_schema t2 large_utf8_col 3 NULL NO LargeUtf8 NULL 
9223372036854775807 NULL NULL NULL NULL NULL
-my_catalog my_schema t2 timestamp_nanos 6 NULL NO Timestamp(Nanosecond, None) 
NULL NULL NULL NULL NULL NULL NULL
-my_catalog my_schema t2 utf8_col 2 NULL YES Utf8 NULL 2147483647 NULL NULL 
NULL NULL NULL
+my_catalog my_schema table_with_many_types binary_col 4 NULL NO Binary NULL 
2147483647 NULL NULL NULL NULL NULL
+my_catalog my_schema table_with_many_types float64_col 1 NULL YES Float64 NULL 
NULL 24 2 NULL NULL NULL
+my_catalog my_schema table_with_many_types int32_col 0 NULL NO Int32 NULL NULL 
32 2 NULL NULL NULL
+my_catalog my_schema table_with_many_types large_binary_col 5 NULL NO 
LargeBinary NULL 9223372036854775807 NULL NULL NULL NULL NULL
+my_catalog my_schema table_with_many_types large_utf8_col 3 NULL NO LargeUtf8 
NULL 9223372036854775807 NULL NULL NULL NULL NULL
+my_catalog my_schema table_with_many_types timestamp_nanos 6 NULL NO 
Timestamp(Nanosecond, None) NULL NULL NULL NULL NULL NULL NULL
+my_catalog my_schema table_with_many_types utf8_col 2 NULL YES Utf8 NULL 
2147483647 NULL NULL NULL NULL NULL
 
 # Cleanup
 statement ok
 drop table t1
 
 statement ok
-drop table t2
+drop table table_with_many_types
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index d45dbc7ee1..e636e93007 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -4292,3 +4292,24 @@ query T
 select * from table1 as t1 natural join table1_stringview as t2;
 ----
 foo
+
+query TT
+EXPLAIN SELECT count(*)
+FROM my_catalog.my_schema.table_with_many_types AS l
+JOIN my_catalog.my_schema.table_with_many_types AS r ON l.binary_col = 
r.binary_col
+----
+logical_plan
+01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+02)--Projection: 
+03)----Inner Join: l.binary_col = r.binary_col
+04)------SubqueryAlias: l
+05)--------TableScan: my_catalog.my_schema.table_with_many_types 
projection=[binary_col]
+06)------SubqueryAlias: r
+07)--------TableScan: my_catalog.my_schema.table_with_many_types 
projection=[binary_col]
+physical_plan
+01)AggregateExec: mode=Single, gby=[], aggr=[count(*)]
+02)--ProjectionExec: expr=[]
+03)----CoalesceBatchesExec: target_batch_size=3
+04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(binary_col@0, 
binary_col@0)]
+05)--------MemoryExec: partitions=1, partition_sizes=[1]
+06)--------MemoryExec: partitions=1, partition_sizes=[1]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to