This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 91cfb69903 feat(proto): Add protobuf serialization for HashExpr
(#19379)
91cfb69903 is described below
commit 91cfb6990319a672195f97d0b2b179fa9634fec9
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Fri Dec 19 08:35:42 2025 -0600
feat(proto): Add protobuf serialization for HashExpr (#19379)
## Summary
This PR adds protobuf serialization/deserialization support for
`HashExpr`, enabling distributed query execution to serialize hash
expressions used in hash joins and repartitioning.
This is a followup to #18393 which introduced `HashExpr` but did not add
serialization support.
This causes errors when serialization is triggered on a query that
pushes down dynamic filters from a `HashJoinExec`.
As of #18393 `HashJoinExec` produces filters of the form:
```sql
CASE (hash_repartition % 2)
WHEN 0 THEN
a >= ab AND a <= ab AND
b >= bb AND b <= bb AND
hash_lookup(a,b)
WHEN 1 THEN
a >= aa AND a <= aa AND
b >= ba AND b <= ba AND
hash_lookup(a,b)
ELSE
FALSE
END
```
Where `hash_lookup` is an expression that holds a reference to a given
partitions hash join hash table and will check for membership.
Since we created these new expressions but didn't make any of them
serializable any attempt to do a distributed query or similar would run
into errors.
In https://github.com/apache/datafusion/pull/19300 we fixed
`hash_lookup` by replacing it with `true` since it can't be serialized
across the wire (we'd have to send the entire hash table). The logic was
that this preserves the bounds checks, which as still valuable.
This PR handles `hash_repartition` which determines which partition (and
hence which branch of the `CASE` expression) the row belongs to. For
this expression we *can* serialize it, so that's what I'm doing in this
PR.
### Key Changes
- **SeededRandomState wrapper**: Added a `SeededRandomState` struct that
wraps `ahash::RandomState` while preserving the seeds used to create it.
This is necessary because `RandomState` doesn't expose seeds after
creation, but we need them for serialization.
- **Updated seed constants**: Changed `HASH_JOIN_SEED` and
`REPARTITION_RANDOM_STATE` constants to use `SeededRandomState` instead
of raw `RandomState`.
- **HashExpr enhancements**:
- Changed `HashExpr` to use `SeededRandomState`
- Added getter methods: `on_columns()`, `seeds()`, `description()`
- Exported `HashExpr` and `SeededRandomState` from the joins module
- **Protobuf support**:
- Added `PhysicalHashExprNode` message to `datafusion.proto` with fields
for `on_columns`, seeds (4 `u64` values), and `description`
- Implemented serialization in `to_proto.rs`
- Implemented deserialization in `from_proto.rs`
## Test plan
- [x] Added roundtrip test in `roundtrip_physical_plan.rs` that creates
a `HashExpr`, serializes it, deserializes it, and verifies the result
- [x] All existing hash join tests pass (583 tests)
- [x] All proto roundtrip tests pass
🤖 Generated with [Claude Code](https://claude.com/claude-code)
---------
Co-authored-by: Claude Opus 4.5 <[email protected]>
---
.../physical-plan/src/joins/hash_join/exec.rs | 16 +-
.../physical-plan/src/joins/hash_join/mod.rs | 2 +-
.../src/joins/hash_join/partitioned_hash_eval.rs | 349 ++++++++++++++++++++-
.../src/joins/hash_join/shared_bounds.rs | 12 +-
.../physical-plan/src/joins/join_hash_map.rs | 6 +
datafusion/physical-plan/src/joins/mod.rs | 2 +-
datafusion/physical-plan/src/repartition/mod.rs | 11 +-
datafusion/proto/proto/datafusion.proto | 11 +
datafusion/proto/src/generated/pbjson.rs | 207 ++++++++++++
datafusion/proto/src/generated/prost.rs | 19 +-
datafusion/proto/src/physical_plan/from_proto.rs | 15 +
datafusion/proto/src/physical_plan/to_proto.rs | 16 +-
.../proto/tests/cases/roundtrip_physical_plan.rs | 33 ++
13 files changed, 670 insertions(+), 29 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index 13d1e0a982..bd92cf4964 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -86,9 +86,11 @@ use
datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::TryStreamExt;
use parking_lot::Mutex;
+use super::partitioned_hash_eval::SeededRandomState;
+
/// Hard-coded seed to ensure hash values from the hash join differ from
`RepartitionExec`, avoiding collisions.
-pub(crate) const HASH_JOIN_SEED: RandomState =
- RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
+pub(crate) const HASH_JOIN_SEED: SeededRandomState =
+ SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as
u64);
/// HashTable and input data for the left (build side) of a join
pub(super) struct JoinLeftData {
@@ -334,8 +336,8 @@ pub struct HashJoinExec {
/// Each output stream waits on the `OnceAsync` to signal the completion of
/// the hash table creation.
left_fut: Arc<OnceAsync<JoinLeftData>>,
- /// Shared the `RandomState` for the hashing algorithm
- random_state: RandomState,
+ /// Shared the `SeededRandomState` for the hashing algorithm (seeds
preserved for serialization)
+ random_state: SeededRandomState,
/// Partitioning mode to use
pub mode: PartitionMode,
/// Execution metrics
@@ -930,7 +932,7 @@ impl ExecutionPlan for HashJoinExec {
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
Ok(collect_left_input(
- self.random_state.clone(),
+ self.random_state.random_state().clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
@@ -958,7 +960,7 @@ impl ExecutionPlan for HashJoinExec {
.register(context.memory_pool());
OnceFut::new(collect_left_input(
- self.random_state.clone(),
+ self.random_state.random_state().clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
@@ -1041,7 +1043,7 @@ impl ExecutionPlan for HashJoinExec {
self.filter.clone(),
self.join_type,
right_stream,
- self.random_state.clone(),
+ self.random_state.random_state().clone(),
join_metrics,
column_indices_after_projection,
self.null_equality,
diff --git a/datafusion/physical-plan/src/joins/hash_join/mod.rs
b/datafusion/physical-plan/src/joins/hash_join/mod.rs
index 352209e9c3..8592e1d968 100644
--- a/datafusion/physical-plan/src/joins/hash_join/mod.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/mod.rs
@@ -18,7 +18,7 @@
//! [`HashJoinExec`] Partitioned Hash Join Operator
pub use exec::HashJoinExec;
-pub use partitioned_hash_eval::HashTableLookupExpr;
+pub use partitioned_hash_eval::{HashExpr, HashTableLookupExpr,
SeededRandomState};
mod exec;
mod inlist_builder;
diff --git
a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs
b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs
index b91f5f46d7..4c437e8131 100644
--- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs
@@ -34,6 +34,36 @@ use datafusion_physical_expr_common::physical_expr::{
use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType};
+/// RandomState wrapper that preserves the seeds used to create it.
+///
+/// This is needed because ahash's `RandomState` doesn't expose its seeds
after creation,
+/// but we need them for serialization (e.g., protobuf serde).
+#[derive(Clone, Debug)]
+pub struct SeededRandomState {
+ random_state: RandomState,
+ seeds: (u64, u64, u64, u64),
+}
+
+impl SeededRandomState {
+ /// Create a new SeededRandomState with the given seeds.
+ pub const fn with_seeds(k0: u64, k1: u64, k2: u64, k3: u64) -> Self {
+ Self {
+ random_state: RandomState::with_seeds(k0, k1, k2, k3),
+ seeds: (k0, k1, k2, k3),
+ }
+ }
+
+ /// Get the inner RandomState.
+ pub fn random_state(&self) -> &RandomState {
+ &self.random_state
+ }
+
+ /// Get the seeds used to create this RandomState.
+ pub fn seeds(&self) -> (u64, u64, u64, u64) {
+ self.seeds
+ }
+}
+
/// Physical expression that computes hash values for a set of columns
///
/// This expression computes the hash of join key columns using a specific
RandomState.
@@ -45,8 +75,8 @@ use crate::{hash_utils::create_hashes,
joins::utils::JoinHashMapType};
pub struct HashExpr {
/// Columns to hash
on_columns: Vec<PhysicalExprRef>,
- /// Random state for hashing
- random_state: RandomState,
+ /// Random state for hashing (with seeds preserved for serialization)
+ random_state: SeededRandomState,
/// Description for display
description: String,
}
@@ -56,11 +86,11 @@ impl HashExpr {
///
/// # Arguments
/// * `on_columns` - Columns to hash
- /// * `random_state` - RandomState for hashing
+ /// * `random_state` - SeededRandomState for hashing
/// * `description` - Description for debugging (e.g., "hash_repartition",
"hash_join")
- pub(super) fn new(
+ pub fn new(
on_columns: Vec<PhysicalExprRef>,
- random_state: RandomState,
+ random_state: SeededRandomState,
description: String,
) -> Self {
Self {
@@ -69,6 +99,21 @@ impl HashExpr {
description,
}
}
+
+ /// Get the columns being hashed.
+ pub fn on_columns(&self) -> &[PhysicalExprRef] {
+ &self.on_columns
+ }
+
+ /// Get the seeds used for hashing.
+ pub fn seeds(&self) -> (u64, u64, u64, u64) {
+ self.random_state.seeds()
+ }
+
+ /// Get the description.
+ pub fn description(&self) -> &str {
+ &self.description
+ }
}
impl std::fmt::Debug for HashExpr {
@@ -79,7 +124,8 @@ impl std::fmt::Debug for HashExpr {
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ");
- write!(f, "{}({})", self.description, cols)
+ let (s1, s2, s3, s4) = self.seeds();
+ write!(f, "{}({cols}, [{s1},{s2},{s3},{s4}])", self.description)
}
}
@@ -87,12 +133,15 @@ impl Hash for HashExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.on_columns.dyn_hash(state);
self.description.hash(state);
+ self.seeds().hash(state);
}
}
impl PartialEq for HashExpr {
fn eq(&self, other: &Self) -> bool {
- self.on_columns == other.on_columns && self.description ==
other.description
+ self.on_columns == other.on_columns
+ && self.description == other.description
+ && self.seeds() == other.seeds()
}
}
@@ -147,7 +196,11 @@ impl PhysicalExpr for HashExpr {
// Compute hashes
let mut hashes_buffer = vec![0; num_rows];
- create_hashes(&keys_values, &self.random_state, &mut hashes_buffer)?;
+ create_hashes(
+ &keys_values,
+ self.random_state.random_state(),
+ &mut hashes_buffer,
+ )?;
Ok(ColumnarValue::Array(Arc::new(UInt64Array::from(
hashes_buffer,
@@ -206,13 +259,29 @@ impl Hash for HashTableLookupExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.hash_expr.dyn_hash(state);
self.description.hash(state);
+ // Note that we compare hash_map by pointer equality.
+ // Actually comparing the contents of the hash maps would be expensive.
+ // The way these hash maps are used in actuality is that HashJoinExec
creates
+ // one per partition per query execution, thus it is never possible
for two different
+ // hash maps to have the same content in practice.
+ // Theoretically this is a public API and users could create identical
hash maps,
+ // but that seems unlikely and not worth paying the cost of deep
comparison all the time.
+ Arc::as_ptr(&self.hash_map).hash(state);
}
}
impl PartialEq for HashTableLookupExpr {
fn eq(&self, other: &Self) -> bool {
- Arc::ptr_eq(&self.hash_expr, &other.hash_expr)
+ // Note that we compare hash_map by pointer equality.
+ // Actually comparing the contents of the hash maps would be expensive.
+ // The way these hash maps are used in actuality is that HashJoinExec
creates
+ // one per partition per query execution, thus it is never possible
for two different
+ // hash maps to have the same content in practice.
+ // Theoretically this is a public API and users could create identical
hash maps,
+ // but that seems unlikely and not worth paying the cost of deep
comparison all the time.
+ self.hash_expr.as_ref() == other.hash_expr.as_ref()
&& self.description == other.description
+ && Arc::ptr_eq(&self.hash_map, &other.hash_map)
}
}
@@ -294,3 +363,265 @@ impl PhysicalExpr for HashTableLookupExpr {
write!(f, "{}", self.description)
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::joins::join_hash_map::JoinHashMapU32;
+ use datafusion_physical_expr::expressions::Column;
+ use std::collections::hash_map::DefaultHasher;
+ use std::hash::Hasher;
+
+ fn compute_hash<T: Hash>(value: &T) -> u64 {
+ let mut hasher = DefaultHasher::new();
+ value.hash(&mut hasher);
+ hasher.finish()
+ }
+
+ #[test]
+ fn test_hash_expr_eq_same() {
+ let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
+ let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
+
+ let expr1 = HashExpr::new(
+ vec![Arc::clone(&col_a), Arc::clone(&col_b)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "test_hash".to_string(),
+ );
+
+ let expr2 = HashExpr::new(
+ vec![Arc::clone(&col_a), Arc::clone(&col_b)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "test_hash".to_string(),
+ );
+
+ assert_eq!(expr1, expr2);
+ }
+
+ #[test]
+ fn test_hash_expr_eq_different_columns() {
+ let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
+ let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
+ let col_c: PhysicalExprRef = Arc::new(Column::new("c", 2));
+
+ let expr1 = HashExpr::new(
+ vec![Arc::clone(&col_a), Arc::clone(&col_b)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "test_hash".to_string(),
+ );
+
+ let expr2 = HashExpr::new(
+ vec![Arc::clone(&col_a), Arc::clone(&col_c)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "test_hash".to_string(),
+ );
+
+ assert_ne!(expr1, expr2);
+ }
+
+ #[test]
+ fn test_hash_expr_eq_different_description() {
+ let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
+
+ let expr1 = HashExpr::new(
+ vec![Arc::clone(&col_a)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "hash_one".to_string(),
+ );
+
+ let expr2 = HashExpr::new(
+ vec![Arc::clone(&col_a)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "hash_two".to_string(),
+ );
+
+ assert_ne!(expr1, expr2);
+ }
+
+ #[test]
+ fn test_hash_expr_eq_different_seeds() {
+ let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
+
+ let expr1 = HashExpr::new(
+ vec![Arc::clone(&col_a)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "test_hash".to_string(),
+ );
+
+ let expr2 = HashExpr::new(
+ vec![Arc::clone(&col_a)],
+ SeededRandomState::with_seeds(5, 6, 7, 8),
+ "test_hash".to_string(),
+ );
+
+ assert_ne!(expr1, expr2);
+ }
+
+ #[test]
+ fn test_hash_expr_hash_consistency() {
+ let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
+ let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
+
+ let expr1 = HashExpr::new(
+ vec![Arc::clone(&col_a), Arc::clone(&col_b)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "test_hash".to_string(),
+ );
+
+ let expr2 = HashExpr::new(
+ vec![Arc::clone(&col_a), Arc::clone(&col_b)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "test_hash".to_string(),
+ );
+
+ // Equal expressions should have equal hashes
+ assert_eq!(expr1, expr2);
+ assert_eq!(compute_hash(&expr1), compute_hash(&expr2));
+ }
+
+ #[test]
+ fn test_hash_table_lookup_expr_eq_same() {
+ let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
+ let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
+ vec![Arc::clone(&col_a)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "inner_hash".to_string(),
+ ));
+ let hash_map: Arc<dyn JoinHashMapType> =
+ Arc::new(JoinHashMapU32::with_capacity(10));
+
+ let expr1 = HashTableLookupExpr::new(
+ Arc::clone(&hash_expr),
+ Arc::clone(&hash_map),
+ "lookup".to_string(),
+ );
+
+ let expr2 = HashTableLookupExpr::new(
+ Arc::clone(&hash_expr),
+ Arc::clone(&hash_map),
+ "lookup".to_string(),
+ );
+
+ assert_eq!(expr1, expr2);
+ }
+
+ #[test]
+ fn test_hash_table_lookup_expr_eq_different_hash_expr() {
+ let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
+ let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));
+
+ let hash_expr1: PhysicalExprRef = Arc::new(HashExpr::new(
+ vec![Arc::clone(&col_a)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "inner_hash".to_string(),
+ ));
+
+ let hash_expr2: PhysicalExprRef = Arc::new(HashExpr::new(
+ vec![Arc::clone(&col_b)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "inner_hash".to_string(),
+ ));
+
+ let hash_map: Arc<dyn JoinHashMapType> =
+ Arc::new(JoinHashMapU32::with_capacity(10));
+
+ let expr1 = HashTableLookupExpr::new(
+ Arc::clone(&hash_expr1),
+ Arc::clone(&hash_map),
+ "lookup".to_string(),
+ );
+
+ let expr2 = HashTableLookupExpr::new(
+ Arc::clone(&hash_expr2),
+ Arc::clone(&hash_map),
+ "lookup".to_string(),
+ );
+
+ assert_ne!(expr1, expr2);
+ }
+
+ #[test]
+ fn test_hash_table_lookup_expr_eq_different_description() {
+ let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
+ let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
+ vec![Arc::clone(&col_a)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "inner_hash".to_string(),
+ ));
+ let hash_map: Arc<dyn JoinHashMapType> =
+ Arc::new(JoinHashMapU32::with_capacity(10));
+
+ let expr1 = HashTableLookupExpr::new(
+ Arc::clone(&hash_expr),
+ Arc::clone(&hash_map),
+ "lookup_one".to_string(),
+ );
+
+ let expr2 = HashTableLookupExpr::new(
+ Arc::clone(&hash_expr),
+ Arc::clone(&hash_map),
+ "lookup_two".to_string(),
+ );
+
+ assert_ne!(expr1, expr2);
+ }
+
+ #[test]
+ fn test_hash_table_lookup_expr_eq_different_hash_map() {
+ let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
+ let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
+ vec![Arc::clone(&col_a)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "inner_hash".to_string(),
+ ));
+
+ // Two different Arc pointers (even with same content) should not be
equal
+ let hash_map1: Arc<dyn JoinHashMapType> =
+ Arc::new(JoinHashMapU32::with_capacity(10));
+ let hash_map2: Arc<dyn JoinHashMapType> =
+ Arc::new(JoinHashMapU32::with_capacity(10));
+
+ let expr1 = HashTableLookupExpr::new(
+ Arc::clone(&hash_expr),
+ hash_map1,
+ "lookup".to_string(),
+ );
+
+ let expr2 = HashTableLookupExpr::new(
+ Arc::clone(&hash_expr),
+ hash_map2,
+ "lookup".to_string(),
+ );
+
+ // Different Arc pointers means not equal (uses Arc::ptr_eq)
+ assert_ne!(expr1, expr2);
+ }
+
+ #[test]
+ fn test_hash_table_lookup_expr_hash_consistency() {
+ let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
+ let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
+ vec![Arc::clone(&col_a)],
+ SeededRandomState::with_seeds(1, 2, 3, 4),
+ "inner_hash".to_string(),
+ ));
+ let hash_map: Arc<dyn JoinHashMapType> =
+ Arc::new(JoinHashMapU32::with_capacity(10));
+
+ let expr1 = HashTableLookupExpr::new(
+ Arc::clone(&hash_expr),
+ Arc::clone(&hash_map),
+ "lookup".to_string(),
+ );
+
+ let expr2 = HashTableLookupExpr::new(
+ Arc::clone(&hash_expr),
+ Arc::clone(&hash_map),
+ "lookup".to_string(),
+ );
+
+ // Equal expressions should have equal hashes
+ assert_eq!(expr1, expr2);
+ assert_eq!(compute_hash(&expr1), compute_hash(&expr2));
+ }
+}
diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
index 5aa2bbb57d..7d34ce9acb 100644
--- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
@@ -26,10 +26,10 @@ use crate::ExecutionPlanProperties;
use crate::joins::PartitionMode;
use crate::joins::hash_join::exec::HASH_JOIN_SEED;
use crate::joins::hash_join::inlist_builder::build_struct_fields;
-use crate::joins::hash_join::partitioned_hash_eval::{HashExpr,
HashTableLookupExpr};
+use crate::joins::hash_join::partitioned_hash_eval::{
+ HashExpr, HashTableLookupExpr, SeededRandomState,
+};
use crate::joins::utils::JoinHashMapType;
-
-use ahash::RandomState;
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
@@ -88,7 +88,7 @@ impl PartitionBounds {
fn create_membership_predicate(
on_right: &[PhysicalExprRef],
pushdown: PushdownStrategy,
- random_state: &RandomState,
+ random_state: &SeededRandomState,
schema: &Schema,
) -> Result<Option<Arc<dyn PhysicalExpr>>> {
match pushdown {
@@ -230,7 +230,7 @@ pub(crate) struct SharedBuildAccumulator {
on_right: Vec<PhysicalExprRef>,
/// Random state for partitioning (RepartitionExec's hash function with
0,0,0,0 seeds)
/// Used for PartitionedHashLookupPhysicalExpr
- repartition_random_state: RandomState,
+ repartition_random_state: SeededRandomState,
/// Schema of the probe (right) side for evaluating filter expressions
probe_schema: Arc<Schema>,
}
@@ -308,7 +308,7 @@ impl SharedBuildAccumulator {
right_child: &dyn ExecutionPlan,
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
on_right: Vec<PhysicalExprRef>,
- repartition_random_state: RandomState,
+ repartition_random_state: SeededRandomState,
) -> Self {
// Troubleshooting: If partition counts are incorrect, verify this
logic matches
// the actual execution pattern in collect_build_side()
diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs
b/datafusion/physical-plan/src/joins/join_hash_map.rs
index ed370fdb16..b0ed6dcc7c 100644
--- a/datafusion/physical-plan/src/joins/join_hash_map.rs
+++ b/datafusion/physical-plan/src/joins/join_hash_map.rs
@@ -94,6 +94,12 @@ use hashbrown::hash_table::Entry::{Occupied, Vacant};
///
/// At runtime we choose between using `JoinHashMapU32` and `JoinHashMapU64`
which oth implement
/// `JoinHashMapType`.
+///
+/// ## Note on use of this trait as a public API
+/// This is currently a public trait but is mainly intended for internal use
within DataFusion.
+/// For example, we may compare references to `JoinHashMapType`
implementations by pointer equality
+/// rather than deep equality of contents, as deep equality would be expensive
and in our usage
+/// patterns it is impossible for two different hash maps to have identical
contents in a practical sense.
pub trait JoinHashMapType: Send + Sync {
fn extend_zero(&mut self, len: usize);
diff --git a/datafusion/physical-plan/src/joins/mod.rs
b/datafusion/physical-plan/src/joins/mod.rs
index 0ca77b3cae..3ff61ecf1d 100644
--- a/datafusion/physical-plan/src/joins/mod.rs
+++ b/datafusion/physical-plan/src/joins/mod.rs
@@ -20,7 +20,7 @@
use arrow::array::BooleanBufferBuilder;
pub use cross_join::CrossJoinExec;
use datafusion_physical_expr::PhysicalExprRef;
-pub use hash_join::{HashJoinExec, HashTableLookupExpr};
+pub use hash_join::{HashExpr, HashJoinExec, HashTableLookupExpr,
SeededRandomState};
pub use nested_loop_join::NestedLoopJoinExec;
use parking_lot::Mutex;
// Note: SortMergeJoin is not used in plans yet
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 5c9472182b..1efdaaabc7 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -61,6 +61,7 @@ use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
+use crate::joins::SeededRandomState;
use crate::sort_pushdown::SortOrderPushdownResult;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
@@ -429,8 +430,8 @@ enum BatchPartitionerState {
/// Fixed RandomState used for hash repartitioning to ensure consistent
behavior across
/// executions and runs.
-pub const REPARTITION_RANDOM_STATE: ahash::RandomState =
- ahash::RandomState::with_seeds(0, 0, 0, 0);
+pub const REPARTITION_RANDOM_STATE: SeededRandomState =
+ SeededRandomState::with_seeds(0, 0, 0, 0);
impl BatchPartitioner {
/// Create a new [`BatchPartitioner`] with the provided [`Partitioning`]
@@ -514,7 +515,11 @@ impl BatchPartitioner {
hash_buffer.clear();
hash_buffer.resize(batch.num_rows(), 0);
- create_hashes(&arrays, &REPARTITION_RANDOM_STATE,
hash_buffer)?;
+ create_hashes(
+ &arrays,
+ REPARTITION_RANDOM_STATE.random_state(),
+ hash_buffer,
+ )?;
let mut indices: Vec<_> = (0..*partitions)
.map(|_| Vec::with_capacity(batch.num_rows()))
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 6616f77a5b..bd7dd3a6af 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -872,6 +872,8 @@ message PhysicalExprNode {
PhysicalExtensionExprNode extension = 19;
UnknownColumn unknown_column = 20;
+
+ PhysicalHashExprNode hash_expr = 21;
}
}
@@ -990,6 +992,15 @@ message PhysicalExtensionExprNode {
repeated PhysicalExprNode inputs = 2;
}
+message PhysicalHashExprNode {
+ repeated PhysicalExprNode on_columns = 1;
+ uint64 seed0 = 2;
+ uint64 seed1 = 3;
+ uint64 seed2 = 4;
+ uint64 seed3 = 5;
+ string description = 6;
+}
+
message FilterExecNode {
PhysicalPlanNode input = 1;
PhysicalExprNode expr = 2;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index cf3dcfe01b..e269606d16 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -15934,6 +15934,9 @@ impl serde::Serialize for PhysicalExprNode {
physical_expr_node::ExprType::UnknownColumn(v) => {
struct_ser.serialize_field("unknownColumn", v)?;
}
+ physical_expr_node::ExprType::HashExpr(v) => {
+ struct_ser.serialize_field("hashExpr", v)?;
+ }
}
}
struct_ser.end()
@@ -15976,6 +15979,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
"extension",
"unknown_column",
"unknownColumn",
+ "hash_expr",
+ "hashExpr",
];
#[allow(clippy::enum_variant_names)]
@@ -15998,6 +16003,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
LikeExpr,
Extension,
UnknownColumn,
+ HashExpr,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -16037,6 +16043,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode {
"likeExpr" | "like_expr" =>
Ok(GeneratedField::LikeExpr),
"extension" => Ok(GeneratedField::Extension),
"unknownColumn" | "unknown_column" =>
Ok(GeneratedField::UnknownColumn),
+ "hashExpr" | "hash_expr" =>
Ok(GeneratedField::HashExpr),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -16183,6 +16190,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode
{
return
Err(serde::de::Error::duplicate_field("unknownColumn"));
}
expr_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::UnknownColumn)
+;
+ }
+ GeneratedField::HashExpr => {
+ if expr_type__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("hashExpr"));
+ }
+ expr_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::HashExpr)
;
}
}
@@ -16419,6 +16433,199 @@ impl<'de> serde::Deserialize<'de> for
PhysicalExtensionNode {
deserializer.deserialize_struct("datafusion.PhysicalExtensionNode",
FIELDS, GeneratedVisitor)
}
}
+impl serde::Serialize for PhysicalHashExprNode {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if !self.on_columns.is_empty() {
+ len += 1;
+ }
+ if self.seed0 != 0 {
+ len += 1;
+ }
+ if self.seed1 != 0 {
+ len += 1;
+ }
+ if self.seed2 != 0 {
+ len += 1;
+ }
+ if self.seed3 != 0 {
+ len += 1;
+ }
+ if !self.description.is_empty() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.PhysicalHashExprNode", len)?;
+ if !self.on_columns.is_empty() {
+ struct_ser.serialize_field("onColumns", &self.on_columns)?;
+ }
+ if self.seed0 != 0 {
+ #[allow(clippy::needless_borrow)]
+ #[allow(clippy::needless_borrows_for_generic_args)]
+ struct_ser.serialize_field("seed0",
ToString::to_string(&self.seed0).as_str())?;
+ }
+ if self.seed1 != 0 {
+ #[allow(clippy::needless_borrow)]
+ #[allow(clippy::needless_borrows_for_generic_args)]
+ struct_ser.serialize_field("seed1",
ToString::to_string(&self.seed1).as_str())?;
+ }
+ if self.seed2 != 0 {
+ #[allow(clippy::needless_borrow)]
+ #[allow(clippy::needless_borrows_for_generic_args)]
+ struct_ser.serialize_field("seed2",
ToString::to_string(&self.seed2).as_str())?;
+ }
+ if self.seed3 != 0 {
+ #[allow(clippy::needless_borrow)]
+ #[allow(clippy::needless_borrows_for_generic_args)]
+ struct_ser.serialize_field("seed3",
ToString::to_string(&self.seed3).as_str())?;
+ }
+ if !self.description.is_empty() {
+ struct_ser.serialize_field("description", &self.description)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for PhysicalHashExprNode {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "on_columns",
+ "onColumns",
+ "seed0",
+ "seed1",
+ "seed2",
+ "seed3",
+ "description",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ OnColumns,
+ Seed0,
+ Seed1,
+ Seed2,
+ Seed3,
+ Description,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "onColumns" | "on_columns" =>
Ok(GeneratedField::OnColumns),
+ "seed0" => Ok(GeneratedField::Seed0),
+ "seed1" => Ok(GeneratedField::Seed1),
+ "seed2" => Ok(GeneratedField::Seed2),
+ "seed3" => Ok(GeneratedField::Seed3),
+ "description" => Ok(GeneratedField::Description),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = PhysicalHashExprNode;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.PhysicalHashExprNode")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<PhysicalHashExprNode, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut on_columns__ = None;
+ let mut seed0__ = None;
+ let mut seed1__ = None;
+ let mut seed2__ = None;
+ let mut seed3__ = None;
+ let mut description__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::OnColumns => {
+ if on_columns__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("onColumns"));
+ }
+ on_columns__ = Some(map_.next_value()?);
+ }
+ GeneratedField::Seed0 => {
+ if seed0__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("seed0"));
+ }
+ seed0__ =
+
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+ ;
+ }
+ GeneratedField::Seed1 => {
+ if seed1__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("seed1"));
+ }
+ seed1__ =
+
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+ ;
+ }
+ GeneratedField::Seed2 => {
+ if seed2__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("seed2"));
+ }
+ seed2__ =
+
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+ ;
+ }
+ GeneratedField::Seed3 => {
+ if seed3__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("seed3"));
+ }
+ seed3__ =
+
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+ ;
+ }
+ GeneratedField::Description => {
+ if description__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("description"));
+ }
+ description__ = Some(map_.next_value()?);
+ }
+ }
+ }
+ Ok(PhysicalHashExprNode {
+ on_columns: on_columns__.unwrap_or_default(),
+ seed0: seed0__.unwrap_or_default(),
+ seed1: seed1__.unwrap_or_default(),
+ seed2: seed2__.unwrap_or_default(),
+ seed3: seed3__.unwrap_or_default(),
+ description: description__.unwrap_or_default(),
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.PhysicalHashExprNode",
FIELDS, GeneratedVisitor)
+ }
+}
impl serde::Serialize for PhysicalHashRepartition {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 885b61001c..cf343e0258 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1276,7 +1276,7 @@ pub struct PhysicalExtensionNode {
pub struct PhysicalExprNode {
#[prost(
oneof = "physical_expr_node::ExprType",
- tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20"
+ tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20,
21"
)]
pub expr_type: ::core::option::Option<physical_expr_node::ExprType>,
}
@@ -1327,6 +1327,8 @@ pub mod physical_expr_node {
Extension(super::PhysicalExtensionExprNode),
#[prost(message, tag = "20")]
UnknownColumn(super::UnknownColumn),
+ #[prost(message, tag = "21")]
+ HashExpr(super::PhysicalHashExprNode),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -1517,6 +1519,21 @@ pub struct PhysicalExtensionExprNode {
pub inputs: ::prost::alloc::vec::Vec<PhysicalExprNode>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PhysicalHashExprNode {
+ #[prost(message, repeated, tag = "1")]
+ pub on_columns: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ #[prost(uint64, tag = "2")]
+ pub seed0: u64,
+ #[prost(uint64, tag = "3")]
+ pub seed1: u64,
+ #[prost(uint64, tag = "4")]
+ pub seed2: u64,
+ #[prost(uint64, tag = "5")]
+ pub seed3: u64,
+ #[prost(string, tag = "6")]
+ pub description: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FilterExecNode {
#[prost(message, optional, boxed, tag = "1")]
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index aa02e63a5d..073fdd858c 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -48,6 +48,7 @@ use datafusion_physical_plan::expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr,
LikeExpr, Literal,
NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, in_list,
};
+use datafusion_physical_plan::joins::{HashExpr, SeededRandomState};
use datafusion_physical_plan::windows::{create_window_expr,
schema_add_window_field};
use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
use datafusion_proto_common::common::proto_error;
@@ -399,6 +400,20 @@ pub fn parse_physical_expr(
codec,
)?,
)),
+ ExprType::HashExpr(hash_expr) => {
+ let on_columns =
+ parse_physical_exprs(&hash_expr.on_columns, ctx, input_schema,
codec)?;
+ Arc::new(HashExpr::new(
+ on_columns,
+ SeededRandomState::with_seeds(
+ hash_expr.seed0,
+ hash_expr.seed1,
+ hash_expr.seed2,
+ hash_expr.seed3,
+ ),
+ hash_expr.description.clone(),
+ ))
+ }
ExprType::Extension(extension) => {
let inputs: Vec<Arc<dyn PhysicalExpr>> = extension
.inputs
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index b06dec592d..9558effb8a 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -41,7 +41,7 @@ use datafusion_physical_plan::expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr,
IsNullExpr,
Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
};
-use datafusion_physical_plan::joins::HashTableLookupExpr;
+use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
use datafusion_physical_plan::windows::{PlainAggregateWindowExpr,
WindowUDFExpr};
use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
@@ -410,6 +410,20 @@ pub fn serialize_physical_expr(
},
))),
})
+ } else if let Some(expr) = expr.downcast_ref::<HashExpr>() {
+ let (s0, s1, s2, s3) = expr.seeds();
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr(
+ protobuf::PhysicalHashExprNode {
+ on_columns: serialize_physical_exprs(expr.on_columns(),
codec)?,
+ seed0: s0,
+ seed1: s1,
+ seed2: s2,
+ seed3: s3,
+ description: expr.description().to_string(),
+ },
+ )),
+ })
} else {
let mut buf: Vec<u8> = vec![];
match codec.try_encode_expr(&value, &mut buf) {
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index f9babeba56..aa54588493 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -2371,3 +2371,36 @@ fn roundtrip_hash_table_lookup_expr_to_lit() ->
Result<()> {
Ok(())
}
+
+#[test]
+fn roundtrip_hash_expr() -> Result<()> {
+ use datafusion::physical_plan::joins::{HashExpr, SeededRandomState};
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("b", DataType::Utf8, false),
+ ]));
+
+ // Create a HashExpr with test columns and seeds
+ let on_columns = vec![col("a", &schema)?, col("b", &schema)?];
+ let hash_expr: Arc<dyn PhysicalExpr> = Arc::new(HashExpr::new(
+ on_columns,
+ SeededRandomState::with_seeds(0, 1, 2, 3), // arbitrary random seeds
for testing
+ "test_hash".to_string(),
+ ));
+
+ // Wrap in a filter by comparing hash value to a literal
+ // hash_expr > 0 is always boolean
+ let filter_expr = binary(hash_expr, Operator::Gt, lit(0u64), &schema)?;
+ let filter = Arc::new(FilterExec::try_new(
+ filter_expr,
+ Arc::new(EmptyExec::new(schema)),
+ )?);
+
+ // Confirm that the debug string contains the random state seeds
+ assert!(
+ format!("{filter:?}").contains("test_hash(a@0, b@1, [0,1,2,3])"),
+ "Debug string missing seeds: {filter:?}"
+ );
+ roundtrip_test(filter)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]