gene-bordegaray commented on code in PR #20331:
URL: https://github.com/apache/datafusion/pull/20331#discussion_r2801910997


##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1454,21 +1481,86 @@ pub fn ensure_distribution(
         plan.with_new_children(children_plans)?
     };
 
+    // For partitioned hash joins, decide dynamic filter routing mode.
+    //
+    // Dynamic filtering requires matching partitioning schemes on both sides:
+    // - PartitionIndex: Both sides use file-grouped partitioning 
(value-based).
+    //   Partition i on build corresponds to partition i on probe by partition 
value.
+    // - CaseHash: Both sides use hash repartitioning (hash-based).
+    //   Uses CASE expression with hash(row) % N to route to correct partition 
filter.
+    //
+    // NOTE: If partitioning schemes are misaligned (one file-grouped, one 
hash-repartitioned),
+    // the partitioned join itself is incorrect.
+    // Partition assignments don't match:
+    // - File-grouped: partition 0 = all rows where column="A" (value-based)
+    // - Hash-repartitioned: partition 0 = all rows where hash(column) % N == 
0 (hash-based)
+    // These are incompatible, so the join will miss matching rows.
+    plan = if let Some(hash_join) = 
plan.as_any().downcast_ref::<HashJoinExec>()
+        && matches!(hash_join.mode, PartitionMode::Partitioned)
+    {
+        let routing_mode = match (
+            children[0].data.repartitioned,
+            children[1].data.repartitioned,
+        ) {
+            (false, false) => DynamicFilterRoutingMode::PartitionIndex,
+            (true, true) => DynamicFilterRoutingMode::CaseHash,
+            _ => {
+                // Misaligned partitioning schemes
+                return plan_err!(
+                    "Partitioned hash join has incompatible partitioning 
schemes: \
+                     left side is {}, right side is {}.",
+                    if children[0].data.repartitioned {
+                        "hash-repartitioned"
+                    } else {
+                        "file-grouped"
+                    },
+                    if children[1].data.repartitioned {
+                        "hash-repartitioned"
+                    } else {
+                        "file-grouped"
+                    }

Review Comment:
   I didn't love this, maybe could have a helper to map and make it cleaner



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to