mustafasrepo commented on code in PR #11875:
URL: https://github.com/apache/datafusion/pull/11875#discussion_r1711603470


##########
datafusion/core/src/physical_optimizer/enforce_distribution.rs:
##########
@@ -1031,6 +1031,75 @@ fn replace_order_preserving_variants(
     context.update_plan_from_children()
 }
 
+/// A struct to keep track of RepartitionRequirement status for each child 
node.
+struct RepartitionRequirementStatus {
+    roundrobin_beneficial: bool,
+    hash_necessary: bool,
+}
+
+/// Calculates the `RepartitionRequirementStatus` for each children to 
generate consistent requirements.
+/// As an example, for hash exec left children might produce 
`RepartitionRequirementStatus{roundrobin_beneficial: true, hash_necessary: 
true}`
+/// and right children might produce: 
`RepartitionRequirementStatus{roundrobin_beneficial: false, hash_necessary: 
false}`.
+/// When target partitions=4, left child might produce `Hash(vec![expr], 4)` 
and right child might produce `Hash(vec![expr], 4)`. However,
+/// for correct operation we need consistent hashes accross children. This 
util turns right child status:
+/// from `RepartitionRequirementStatus{roundrobin_beneficial: false, 
hash_necessary: false}` into
+/// `RepartitionRequirementStatus{roundrobin_beneficial: false, 
hash_necessary: true}` to generate consistent plan.
+fn get_repartition_status_flags(
+    requirements: &[Distribution],
+    children: &[&Arc<dyn ExecutionPlan>],
+    batch_size: usize,
+    should_use_estimates: bool,
+) -> Result<Vec<RepartitionRequirementStatus>> {
+    debug_assert_eq!(requirements.len(), children.len());
+    let mut repartition_status_flags = vec![];
+    for (child, requirement) in children.iter().zip(requirements) {
+        // Don't need to apply when the returned row count is not greater than 
batch size
+        let num_rows = child.statistics()?.num_rows;
+        let roundrobin_beneficial = if let Some(n_rows) = num_rows.get_value() 
{
+            // Row count estimate is larger than the batch size.
+            // Adding repartition is desirable for this case
+            // According to `should_use_estimates` flag, we can either use 
exact and inexact row numbers or only exact row numbers for this decision.
+            if should_use_estimates || num_rows.is_exact().unwrap() {
+                *n_rows > batch_size
+            } else {
+                true
+            }
+        } else {
+            true
+        };
+        let is_hash = matches!(requirement, Distribution::HashPartitioned(_));
+        let mut hash_necessary = false;
+        if is_hash && child.output_partitioning().partition_count() > 1 {
+            // when input partitioning is larger than 1 for hash requirement.
+            // re-partitioning is desired
+            hash_necessary = true;

Review Comment:
   Indeed you are right. This is necessity rather than optimization. Updated 
the comment to reflect that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to