adriangb commented on code in PR #20331:
URL: https://github.com/apache/datafusion/pull/20331#discussion_r2818058399
##########
datafusion/common/src/config.rs:
##########
@@ -996,6 +996,39 @@ config_namespace! {
///
/// Note: This may reduce parallelism, rooting from the I/O level, if
the number of distinct
/// partitions is less than the target_partitions.
+ ///
+ /// Note for partitioned hash join dynamic filtering:
+ /// preserving file partitions can allow partition-index routing (`i
-> i`) instead of
+ /// CASE-hash routing, but this assumes build/probe partition indices
stay aligned for
+ /// partition hash join / dynamic filter consumers.
+ ///
+ /// Misaligned Partitioned Hash Join Example:
Review Comment:
Is this something DataFusion users have to think about? I.e. is this
something a user can mess up or would it only happen if there was a bug in
DataFusion?
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -809,6 +831,23 @@ impl HashJoinExec {
self.dynamic_filter.as_ref().map(|df| &df.filter)
}
+ /// Determines whether partition-index routing should be used instead of
CASE hash routing.
+ ///
+ /// Enabled when:
+ /// 1. The join is in `Partitioned` mode
+ /// 2. The optimizer selected `DynamicFilterRoutingMode::PartitionIndex`.
+ ///
+ /// Note: In CollectLeft mode, even when the probe side (right) has
misaligned partitonings,
+ /// this works correctly because there is a single shared hash table built
from the coalesced
+ /// left side. All probe partitions use the same hash table for lookups.
This applies to
+ /// dynamic filters as well, the single build-side filter is sufficient
for all probe
+ /// partitions.
+ fn should_use_partition_index(&self) -> bool {
+ matches!(self.mode, PartitionMode::Partitioned)
+ && self.dynamic_filter_routing_mode
+ == DynamicFilterRoutingMode::PartitionIndex
Review Comment:
Would these two ever not match?
##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -450,6 +531,25 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
}
}
+/// Snapshot a `PhysicalExpr` tree, replacing any
[`DynamicFilterPhysicalExpr`] that
+/// has per-partition data with its partition-specific filter expression.
+/// If a `DynamicFilterPhysicalExpr` does not have partitioned data, it is
left unchanged.
+pub fn snapshot_physical_expr_for_partition(
+ expr: Arc<dyn PhysicalExpr>,
+ partition: usize,
+) -> Result<Arc<dyn PhysicalExpr>> {
Review Comment:
This is an interesting way to go about it. It does kind of feel to me like
we are trying to shoehorn the concept of partitions into `PhysicalExpr` through
the `snapshot()` API which was never intended for this use case. I feel there
is probably a much cleaner way to introduce the concept of a partition to
`PhysicalExpr`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]