adriangb commented on code in PR #20331:
URL: https://github.com/apache/datafusion/pull/20331#discussion_r2818058399


##########
datafusion/common/src/config.rs:
##########
@@ -996,6 +996,39 @@ config_namespace! {
         ///
         /// Note: This may reduce parallelism, rooting from the I/O level, if 
the number of distinct
         /// partitions is less than the target_partitions.
+        ///
+        /// Note for partitioned hash join dynamic filtering:
+        /// preserving file partitions can allow partition-index routing (`i 
-> i`) instead of
+        /// CASE-hash routing, but this assumes build/probe partition indices 
stay aligned for
+        /// partition hash join / dynamic filter consumers.
+        ///
+        /// Misaligned Partitioned Hash Join Example:

Review Comment:
   Is this something DataFusion users have to think about? I.e. is this 
something a user can mess up or would it only happen if there was a bug in 
DataFusion?



##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -809,6 +831,23 @@ impl HashJoinExec {
         self.dynamic_filter.as_ref().map(|df| &df.filter)
     }
 
+    /// Determines whether partition-index routing should be used instead of 
CASE hash routing.
+    ///
+    /// Enabled when:
+    /// 1. The join is in `Partitioned` mode
+    /// 2. The optimizer selected `DynamicFilterRoutingMode::PartitionIndex`.
+    ///
+    /// Note: In CollectLeft mode, even when the probe side (right) has 
misaligned partitonings,
+    /// this works correctly because there is a single shared hash table built 
from the coalesced
+    /// left side. All probe partitions use the same hash table for lookups. 
This applies to
+    /// dynamic filters as well, the single build-side filter is sufficient 
for all probe
+    /// partitions.
+    fn should_use_partition_index(&self) -> bool {
+        matches!(self.mode, PartitionMode::Partitioned)
+            && self.dynamic_filter_routing_mode
+                == DynamicFilterRoutingMode::PartitionIndex

Review Comment:
   Would these two ever not match?



##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -450,6 +531,25 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
     }
 }
 
+/// Snapshot a `PhysicalExpr` tree, replacing any 
[`DynamicFilterPhysicalExpr`] that
+/// has per-partition data with its partition-specific filter expression.
+/// If a `DynamicFilterPhysicalExpr` does not have partitioned data, it is 
left unchanged.
+pub fn snapshot_physical_expr_for_partition(
+    expr: Arc<dyn PhysicalExpr>,
+    partition: usize,
+) -> Result<Arc<dyn PhysicalExpr>> {

Review Comment:
   This is an interesting way to go about it. It does kind of feel to me like 
we are trying to shoehorn the concept of partitions into `PhysicalExpr` through 
the `snapshot()` API which was never intended for this use case. I feel there 
is probably a much cleaner way to introduce the concept of a partition to 
`PhysicalExpr`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to