gabotechs commented on code in PR #18451:
URL: https://github.com/apache/datafusion/pull/18451#discussion_r2542084597


##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -418,6 +418,10 @@ enum BatchPartitionerState {
     },
 }
 
+/// Fixed seed used for hash repartitioning to ensure consistent behavior 
across
+/// executions and runs.
+pub const REPARTITION_HASH_SEED: [u64; 4] = [0u64; 4];

Review Comment:
   Note that `RandomState::with_seeds()` is a `const` function, so a cleaner 
thing that comes to mind is to do:
   ```rust
   pub const RANDOM_STATE: RandomState = RandomState::with_seeds(0, 0, 0, 0);
   ```
   



##########
datafusion/physical-plan/src/joins/hash_join/stream.rs:
##########
@@ -405,24 +408,34 @@ impl HashJoinStream {
             .get_shared(cx))?;
         build_timer.done();
 
-        // Handle dynamic filter bounds accumulation
+        // Handle dynamic filter build-side information accumulation
         //
         // Dynamic filter coordination between partitions:
-        // Report bounds to the accumulator which will handle synchronization 
and filter updates
-        if let Some(ref bounds_accumulator) = self.bounds_accumulator {
-            let bounds_accumulator = Arc::clone(bounds_accumulator);
+        // Report hash maps (Partitioned mode) or bounds (CollectLeft mode) to 
the accumulator
+        // which will handle synchronization and filter updates
+        if let Some(ref build_accumulator) = self.build_accumulator {
+            let build_accumulator = Arc::clone(build_accumulator);
 
             let left_side_partition_id = match self.mode {
                 PartitionMode::Partitioned => self.partition,
                 PartitionMode::CollectLeft => 0,
                 PartitionMode::Auto => unreachable!("PartitionMode::Auto 
should not be present at execution time. This is a bug in DataFusion, please 
report it!"),
             };
 
-            let left_data_bounds = left_data.bounds.clone();
-            self.bounds_waiter = Some(OnceFut::new(async move {
-                bounds_accumulator
-                    .report_partition_bounds(left_side_partition_id, 
left_data_bounds)
-                    .await
+            let build_data = match self.mode {
+                PartitionMode::Partitioned => 
PartitionBuildDataReport::Partitioned {
+                    partition_id: left_side_partition_id,
+                    bounds: left_data.bounds.clone(),

Review Comment:
   Right not is completely acceptable, as only a handful of ScalarValues are 
cloned, but I imagine in the future this could imply cloning a big `IN (lit1, 
lit2, lit3, lit4, ...)` expression.



##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1416,6 +1409,7 @@ async fn collect_left_input(
 
     // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, 
otherwise use the
     // `u64` indice variant
+    // Arc is used instead of Box to allow sharing with SharedBuildAccumulator 
for hash map pushdown

Review Comment:
   Actually, I think the comment is obsolete now, it probably can be removed.



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -137,19 +215,20 @@ impl SharedBoundsAccumulator {
     ///   `collect_build_side` once. Expected calls = number of build 
partitions.
     ///
     /// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as 
safe default since
-    ///   the actual mode will be determined and a new bounds_accumulator 
created before execution.
+    ///   the actual mode will be determined and a new accumulator created 
before execution.
     ///
     /// ## Why This Matters
     ///
     /// We cannot build a partial filter from some partitions - it would 
incorrectly eliminate
-    /// valid join results. We must wait until we have complete bounds 
information from ALL
+    /// valid join results. We must wait until we have complete information 
from ALL
     /// relevant partitions before updating the dynamic filter.
     pub(crate) fn new_from_partition_mode(
         partition_mode: PartitionMode,
         left_child: &dyn ExecutionPlan,
         right_child: &dyn ExecutionPlan,
         dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
         on_right: Vec<PhysicalExprRef>,
+        repartition_random_state: RandomState,

Review Comment:
   Does it make sense that this is something different than a 
`RandomState::with_seeds(0, 0, 0, 0)`? I'd be that no, but correct me if I'm 
wrong.
   
   I wonder if for dealing with random states, it makes more sense to store 
them in constants:
   ```
   pub const REPARTITION_RANDOM_STATE = RandomState::with_seeds(0, 0, 0, 0);
   ```
   and avoid accepting them in arguments as much as possible. I imagine the 
kind of bugs that can appear from passing the wrong `RandomState` could be very 
hard to debug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to