gabotechs commented on code in PR #23184:
URL: https://github.com/apache/datafusion/pull/23184#discussion_r3486060060
##########
datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs:
##########
@@ -723,6 +725,7 @@ fn add_hash_on_top(
hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
n_target: usize,
allow_subset_satisfy_partitioning: bool,
+ force_to_target: bool,
Review Comment:
:grimacing: if-driven-development alert
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -406,37 +391,56 @@ impl Partitioning {
}
}
- /// Returns true when `self` and `other` describe compatible partition
maps.
+ /// Returns true when two partitionings both satisfy their own distribution
+ /// requirements and can be paired by partition index.
+ ///
+ /// Use this for multi-input operators, such as partitioned joins, where
+ /// each child has a different schema, required [`Distribution`], and
+ /// expression-equivalence context.
+ ///
+ /// ```text
+ /// # co-partitioned: each side satisfies its own requirement, and
boundaries match
+ /// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
+ /// right: Range(right.x ASC, [10, 20]), required KeyPartitioned(right.x)
+ ///
+ /// # not compatible: right side does not satisfy a hash-specific
requirement
+ /// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
+ /// right: Range(right.x ASC, [10, 20]), required HashPartitioned(right.x)
///
- /// Compatible partition maps can be used for partition-local behavior: if
- /// this returns true, partition `i` from both partitionings can be treated
- /// as covering the same partition domain. This is stricter than
- /// [`Self::satisfaction`], which only answers whether this partitioning
can
- /// satisfy a required distribution.
- pub fn compatible_with(
+ /// # not compatible: boundaries differ
+ /// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
+ /// right: Range(right.x ASC, [15, 20]), required KeyPartitioned(right.x)
+ /// ```
+ pub fn co_partitioned_with(
&self,
- other: &Self,
+ required: &Distribution,
eq_properties: &EquivalenceProperties,
+ other: &Self,
+ other_required: &Distribution,
+ other_eq_properties: &EquivalenceProperties,
) -> bool {
+ if !self
+ .satisfaction(required, eq_properties, false)
+ .is_satisfied()
+ || !other
+ .satisfaction(other_required, other_eq_properties, false)
+ .is_satisfied()
+ {
+ return false;
+ }
+
if self.partition_count() == 1 && other.partition_count() == 1 {
return true;
}
+ if self.partition_count() != other.partition_count() {
+ return false;
+ }
+
match (self, other) {
- (
- Partitioning::Hash(left_exprs, left_count),
- Partitioning::Hash(right_exprs, right_count),
- ) => {
- if left_count != right_count {
- return false;
- }
- if left_exprs.is_empty() || right_exprs.is_empty() {
- return false;
- }
- equivalent_exprs(left_exprs, right_exprs, eq_properties)
- }
+ (Partitioning::Hash(_, _), Partitioning::Hash(_, _)) => true,
Review Comment:
:thinking: Is this correct?
What if the left `Partitioning::Hash` contains 1 expression but the right
`Partitioning::Hash` contains 6 completely different expressions? I'm not sure
if returning `true` in that case is correct.
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -406,37 +391,56 @@ impl Partitioning {
}
}
- /// Returns true when `self` and `other` describe compatible partition
maps.
+ /// Returns true when two partitionings both satisfy their own distribution
+ /// requirements and can be paired by partition index.
+ ///
+ /// Use this for multi-input operators, such as partitioned joins, where
+ /// each child has a different schema, required [`Distribution`], and
+ /// expression-equivalence context.
+ ///
+ /// ```text
+ /// # co-partitioned: each side satisfies its own requirement, and
boundaries match
+ /// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
+ /// right: Range(right.x ASC, [10, 20]), required KeyPartitioned(right.x)
///
- /// Compatible partition maps can be used for partition-local behavior: if
- /// this returns true, partition `i` from both partitionings can be treated
- /// as covering the same partition domain. This is stricter than
- /// [`Self::satisfaction`], which only answers whether this partitioning
can
- /// satisfy a required distribution.
- pub fn compatible_with(
+ /// # not compatible: right side does not satisfy a hash-specific
requirement
+ /// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
+ /// right: Range(right.x ASC, [10, 20]), required HashPartitioned(right.x)
+ ///
+ /// # not compatible: boundaries differ
+ /// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
+ /// right: Range(right.x ASC, [15, 20]), required KeyPartitioned(right.x)
+ /// ```
+ pub fn co_partitioned_with(
Review Comment:
:+1: I don't think it's a big deal, this is anyways not the typical method
external consumers rely one.
##########
datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs:
##########
@@ -723,6 +725,7 @@ fn add_hash_on_top(
hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
n_target: usize,
allow_subset_satisfy_partitioning: bool,
+ force_to_target: bool,
Review Comment:
I'll keep reading to see if I can suggest something better
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -596,17 +642,39 @@ pub enum Distribution {
/// Requires children to be distributed in such a way that the same
/// values of the keys end up in the same partition
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
+ /// Requires rows with equal values for the given keys to be colocated in
+ /// the same partition, without requiring a specific partitioning
algorithm.
+ ///
+ /// Unlike [`Self::HashPartitioned`], this can be satisfied by non-hash
+ /// partitioning such as range partitioning. A partitioning on a subset of
+ /// these keys can also satisfy this requirement because rows equal on all
+ /// required keys are also equal on any subset.
+ ///
+ /// For multi-input operators, satisfaction alone is not enough: each input
+ /// may satisfy its own key requirement while using incompatible partition
+ /// boundaries. Use [`Partitioning::co_partitioned_with`] before pairing
+ /// partitions by index.
+ KeyPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}
Review Comment:
I'm wondering if there's a way of condense the two `HashPartitioned` and
`KeyPartitioned` into just one. Take a look at the
`Distribution::HashPartitioned` docs:
> Requires children to be distributed in such a way that the same values of
the keys end up in the same partition
For meeting that distribution, I don't think it's really necessary that
children are partitioned specifically with a hashin algorithm, they can be
partitioned in any way that satisfies that invariant (same value of the keys
end up in the same partition).
I'm yet to read the full PR, but I'm getting the feeling that the previous
`HashPartitioned` naming is just unfortunate, and could probably have been just
named `KeyPartitioned` from the beginning.
What's your opinion on trying to rename `HashPartitioned` to
`KeyPartitioned` instead?
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -484,51 +529,52 @@ impl Partitioning {
Distribution::SinglePartition if self.partition_count() == 1 => {
PartitioningSatisfaction::Exact
}
- // When partition count is 1, hash requirement is satisfied.
- Distribution::HashPartitioned(_) if self.partition_count() == 1 =>
{
+ // When partition count is 1, partitioned requirements are
satisfied.
+ Distribution::HashPartitioned(_) | Distribution::KeyPartitioned(_)
+ if self.partition_count() == 1 =>
+ {
PartitioningSatisfaction::Exact
}
+ Distribution::KeyPartitioned(required_exprs) => match self {
+ Partitioning::Hash(partition_exprs, _) =>
Self::key_expr_satisfaction(
+ partition_exprs,
+ required_exprs,
+ eq_properties,
+ allow_subset,
+ ),
+ Partitioning::Range(range) => {
+ let partition_exprs = range
+ .ordering
+ .iter()
+ .map(|sort_expr| Arc::clone(&sort_expr.expr))
+ .collect::<Vec<_>>();
+ Self::key_expr_satisfaction(
+ &partition_exprs,
+ required_exprs,
+ eq_properties,
+ allow_subset,
+ )
+ }
+ Partitioning::RoundRobinBatch(_)
+ | Partitioning::UnknownPartitioning(_) => {
+ PartitioningSatisfaction::NotSatisfied
+ }
+ },
Distribution::HashPartitioned(required_exprs) => match self {
// Here we do not check the partition count for hash
partitioning and assumes the partition count
// and hash functions in the system are the same. In future if
we plan to support storage partition-wise joins,
// then we need to have the partition count and hash functions
validation.
- Partitioning::Hash(partition_exprs, _) => {
- // Empty hash partitioning is invalid
- if partition_exprs.is_empty() || required_exprs.is_empty()
{
- return PartitioningSatisfaction::NotSatisfied;
- }
-
- if equivalent_exprs(required_exprs, partition_exprs,
eq_properties) {
- return PartitioningSatisfaction::Exact;
- }
-
- let eq_groups = eq_properties.eq_group();
- if !eq_groups.is_empty() {
- if allow_subset {
- let normalized_partition_exprs =
- normalize_exprs(partition_exprs,
eq_properties);
- let normalized_required_exprs =
- normalize_exprs(required_exprs, eq_properties);
- if Self::is_subset_partitioning(
- &normalized_partition_exprs,
- &normalized_required_exprs,
- ) {
- return PartitioningSatisfaction::Subset;
- }
- }
- } else if allow_subset
- && Self::is_subset_partitioning(partition_exprs,
required_exprs)
- {
- return PartitioningSatisfaction::Subset;
- }
-
- PartitioningSatisfaction::NotSatisfied
- }
+ Partitioning::Hash(partition_exprs, _) =>
Self::key_expr_satisfaction(
+ partition_exprs,
+ required_exprs,
+ eq_properties,
+ allow_subset,
+ ),
Partitioning::RoundRobinBatch(_)
Review Comment:
The fact that satisfying both `Distribution::KeyPartitioned` and
`Distribution::HashPartitioned` with `Partitioning::Hash` requires exactly the
same code reinforces my comment above. The longer I look the more I think we
should collapse `Distribution::HashPartitioned` and
`Distribution::KeyPartitioned` into just 1
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -406,37 +391,56 @@ impl Partitioning {
}
}
- /// Returns true when `self` and `other` describe compatible partition
maps.
+ /// Returns true when two partitionings both satisfy their own distribution
+ /// requirements and can be paired by partition index.
+ ///
+ /// Use this for multi-input operators, such as partitioned joins, where
+ /// each child has a different schema, required [`Distribution`], and
+ /// expression-equivalence context.
+ ///
+ /// ```text
+ /// # co-partitioned: each side satisfies its own requirement, and
boundaries match
+ /// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
+ /// right: Range(right.x ASC, [10, 20]), required KeyPartitioned(right.x)
///
- /// Compatible partition maps can be used for partition-local behavior: if
- /// this returns true, partition `i` from both partitionings can be treated
- /// as covering the same partition domain. This is stricter than
- /// [`Self::satisfaction`], which only answers whether this partitioning
can
- /// satisfy a required distribution.
- pub fn compatible_with(
+ /// # not compatible: right side does not satisfy a hash-specific
requirement
+ /// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
+ /// right: Range(right.x ASC, [10, 20]), required HashPartitioned(right.x)
+ ///
+ /// # not compatible: boundaries differ
+ /// left: Range(left.a ASC, [10, 20]), required KeyPartitioned(left.a)
+ /// right: Range(right.x ASC, [15, 20]), required KeyPartitioned(right.x)
+ /// ```
+ pub fn co_partitioned_with(
Review Comment:
Without taking a look at the usages of this method, I do notice that the
signature seems a bit off: why would it need the required `Distribution` for
checking if two `Partitioning`s are co-partitioned? it seems unrelated.
I'll keep reading though
##########
datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs:
##########
@@ -723,6 +725,7 @@ fn add_hash_on_top(
hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
n_target: usize,
allow_subset_satisfy_partitioning: bool,
+ force_to_target: bool,
Review Comment:
Ok, it's not that bad, but at this point I don't think the `add_hash_on_top`
is working well as an abstraction.
The output I would expect from this function is that it adds a hash
repartition on top, but it might not do that, it depends on some conditional
logic. There's some relatively good docs that make up for it, but rather than
documenting the weirdness, it might be worth not having it at all from the
beginning.
If you ask me, I'd just remove this function and inline the body in the only
place where it's used:
<details><summary>Code suggestion</summary>
```rust
Distribution::HashPartitioned(exprs)
| Distribution::KeyPartitioned(exprs) => 'curr_match_branch:
{
// See
https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for
background
// When inserting hash is necessary to satisfy hash
requirement, insert hash repartition.
if !hash_necessary {
break 'curr_match_branch;
}
// Early return if hash repartition is unnecessary
// `RepartitionExec: partitioning=Hash([...], 1),
input_partitions=1` is unnecessary.
if target_partitions == 1 && current_partitions == 1 {
break 'curr_match_branch;
}
let is_satisfied = child
.plan
.output_partitioning()
.satisfaction(
&Distribution::HashPartitioned(exprs.to_vec()),
child.plan.equivalence_properties(),
allow_subset_satisfy_partitioning,
)
.is_satisfied();
// Add hash repartitioning when:
// - When subset satisfaction is enabled (current >=
threshold): only repartition if not satisfied
// - When below threshold (current < threshold):
repartition if expressions don't match OR to increase parallelism
let needs_repartition = if force_hash_to_target {
!is_satisfied || target_partitions !=
current_partitions
} else if allow_subset_satisfy_partitioning {
!is_satisfied
} else {
!is_satisfied || target_partitions >
current_partitions
};
if needs_repartition {
let part = Partitioning::Hash(exprs.to_vec(),
target_partitions);
child.plan = Arc::new(
RepartitionExec::try_new(child.plan, part)?
.with_preserve_order(),
);
}
}
```
</details>
Which allows trimming a fair amount of code (35 insertions(+), 81
deletions(-)) and is easier to track as readers do not need to jump back and
forth
##########
datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs:
##########
@@ -723,6 +725,7 @@ fn add_hash_on_top(
hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
n_target: usize,
allow_subset_satisfy_partitioning: bool,
+ force_to_target: bool,
Review Comment:
This is a nit though, what you have right now in this PR is also good, so
only take the suggestion if you want.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]