gabotechs commented on code in PR #19304:
URL: https://github.com/apache/datafusion/pull/19304#discussion_r2618356865


##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -339,4 +407,173 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_subset_partitioning_normal() -> Result<()> {

Review Comment:
   I usually try to advocate for separating different test cases in different 
test functions, that allows reporting and running in isolation.
   
   If you think it would be easy to refactor these tests so that they adhere to 
that then awesome, otherwise, also +1 to leave them as is, they are pretty good



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -889,32 +889,41 @@ fn add_roundrobin_on_top(
 /// * `hash_exprs`: Stores Physical Exprs that are used during hashing.
 /// * `n_target`: desired target partition number, if partition number of the
 ///   current executor is less than this value. Partition number will be 
increased.
+/// * `allow_subset`: Whether to allow subset partitioning logic in 
satisfaction checks.
+///   Set to `false` for partitioned hash joins to ensure exact hash matching.
 ///
 /// # Returns
 ///
 /// A [`Result`] object that contains new execution plan where the desired
 /// distribution is satisfied by adding a Hash repartition.
 fn add_hash_on_top(
     input: DistributionContext,
-    hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
+    hash_exprs: &[Arc<dyn PhysicalExpr>],

Review Comment:
   As the `hash_exprs` is immediately going to be cloned in `let dist = 
Distribution::HashPartitioned(hash_exprs.to_owned());`, I'd argue that just 
leaving this argument as an owned value like before is slightly better



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -889,32 +889,41 @@ fn add_roundrobin_on_top(
 /// * `hash_exprs`: Stores Physical Exprs that are used during hashing.
 /// * `n_target`: desired target partition number, if partition number of the
 ///   current executor is less than this value. Partition number will be 
increased.
+/// * `allow_subset`: Whether to allow subset partitioning logic in 
satisfaction checks.
+///   Set to `false` for partitioned hash joins to ensure exact hash matching.
 ///
 /// # Returns
 ///
 /// A [`Result`] object that contains new execution plan where the desired
 /// distribution is satisfied by adding a Hash repartition.
 fn add_hash_on_top(
     input: DistributionContext,
-    hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
+    hash_exprs: &[Arc<dyn PhysicalExpr>],
     n_target: usize,
+    allow_subset: bool,

Review Comment:
   Usually, when there is a function that accepts a parameter only relevant for 
a very specific combination of the inputs (`Distribution::HashPartitioned` and 
`Partitioning::Hash` in this case), and irrelevant for the majority of the 
cases, it signals that there might be a better way of integrating the parameter 
in the existing data model.
   
   At first sight, one idea that comes natural to mind is to, rather than 
threading the `allow_subset: bool` as a side parameter, to bake it into the 
entity that is supposed to model hash partition requirements:
   
   ```rust
   #[derive(Debug, Clone)]
   pub enum Distribution {
       /// Unspecified distribution
       UnspecifiedDistribution,
       /// A single partition is required
       SinglePartition,
       /// Requires children to be distributed in such a way that the same
       /// values of the keys end up in the same partition
   +   HashPartitioned {
   +       expr: Vec<Arc<dyn PhysicalExpr>>,
   +       allow_subset: bool,
   +   },
   }
   ```
   
   I was not in the battlefield coding this, so my perspective might be a bit 
tilted.
   
   Do you think something like this would make sense?



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -148,51 +170,87 @@ impl Partitioning {
         }
     }
 
-    /// Returns true when the guarantees made by this [`Partitioning`] are 
sufficient to
-    /// satisfy the partitioning scheme mandated by the `required` 
[`Distribution`].
+    /// Returns true if `subset_exprs` is a subset of `superset_exprs`.

Review Comment:
   I'd extend here the comment stating in `Hash(a, b)` and `Hash(a)` which one 
is the subset and which one the superset. I bet people unfamiliar with this 
topic would appreciate it.



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -889,32 +889,41 @@ fn add_roundrobin_on_top(
 /// * `hash_exprs`: Stores Physical Exprs that are used during hashing.
 /// * `n_target`: desired target partition number, if partition number of the
 ///   current executor is less than this value. Partition number will be 
increased.
+/// * `allow_subset`: Whether to allow subset partitioning logic in 
satisfaction checks.
+///   Set to `false` for partitioned hash joins to ensure exact hash matching.
 ///
 /// # Returns
 ///
 /// A [`Result`] object that contains new execution plan where the desired
 /// distribution is satisfied by adding a Hash repartition.
 fn add_hash_on_top(
     input: DistributionContext,
-    hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
+    hash_exprs: &[Arc<dyn PhysicalExpr>],
     n_target: usize,
+    allow_subset: bool,
 ) -> Result<DistributionContext> {
     // Early return if hash repartition is unnecessary
     // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is 
unnecessary.
     if n_target == 1 && input.plan.output_partitioning().partition_count() == 
1 {
         return Ok(input);
     }
 
-    let dist = Distribution::HashPartitioned(hash_exprs);
-    let satisfied = input
-        .plan
-        .output_partitioning()
-        .satisfy(&dist, input.plan.equivalence_properties());
+    let dist = Distribution::HashPartitioned(hash_exprs.to_owned());
+    let satisfaction = input.plan.output_partitioning().satisfy(
+        &dist,
+        input.plan.equivalence_properties(),
+        allow_subset,
+    );
 
     // Add hash repartitioning when:
     // - The hash distribution requirement is not satisfied, or
-    // - We can increase parallelism by adding hash partitioning.
-    if !satisfied || n_target > 
input.plan.output_partitioning().partition_count() {
+    // - We can increase parallelism by adding hash partitioning (but NOT if 
using subset logic,
+    //   as that would break file partition grouping)
+    let needs_repartition = !satisfaction.is_satisfied()
+        || (n_target > input.plan.output_partitioning().partition_count()
+            && !satisfaction.is_subset());

Review Comment:
   This boolean statement got a bit complex. How about factor its expressions 
out into variables with nice names? that could help readability.



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1246,12 +1265,19 @@ pub fn ensure_distribution(
                 hash_necessary,
             },
         )| {
+            let increases_partition_count =
+                child.plan.output_partitioning().partition_count() < 
target_partitions;
+
             let add_roundrobin = enable_round_robin
                 // Operator benefits from partitioning (e.g. filter):
                 && roundrobin_beneficial
                 && roundrobin_beneficial_stats
                 // Unless partitioning increases the partition count, it is 
not beneficial:
-                && child.plan.output_partitioning().partition_count() < 
target_partitions;
+                && increases_partition_count;
+
+            let allow_subset_partition = !repartition_subset_satisfactions
+                && !is_partitioned_join
+                && !increases_partition_count;

Review Comment:
   I'm a bit confused about this statement. Do we only allow 
`allow_subset_partition` if the user set `repartition_subset_satisfactions` to 
false? if `repartition_subset_satisfactions == false` it means that the user 
does not want allow subset repartitions right? therefore 
`allow_subset_partition` should also be `false`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to