This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-52 by this push:
     new d5dae86a6a [branch-52] Split BatchPartitioner::try_new into hash and 
round-robin constructors (#19681)
d5dae86a6a is described below

commit d5dae86a6a0e087bac4a1ce41e267f91f4c348a7
Author: Mohit rao <[email protected]>
AuthorDate: Thu Jan 8 08:57:26 2026 +0530

    [branch-52] Split BatchPartitioner::try_new into hash and round-robin 
constructors (#19681)
    
    Backport of #19668 to branch-52.
    
    This PR cherry-picks commit 680ddcc from main.
    
    Includes:
    - Split of BatchPartitioner::try_new into hash and round-robin
    constructors
    - Documentation improvements
    - No behavior changes
    
    
    part of #18566
    
    Co-authored-by: Your Name <[email protected]>
---
 datafusion/physical-plan/src/repartition/mod.rs | 117 +++++++++++++++++++-----
 1 file changed, 94 insertions(+), 23 deletions(-)

diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 1efdaaabc7..d50404c8fc 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -434,33 +434,90 @@ pub const REPARTITION_RANDOM_STATE: SeededRandomState =
     SeededRandomState::with_seeds(0, 0, 0, 0);
 
 impl BatchPartitioner {
-    /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`]
+    /// Create a new [`BatchPartitioner`] for hash-based repartitioning.
     ///
-    /// The time spent repartitioning will be recorded to `timer`
+    /// # Parameters
+    /// - `exprs`: Expressions used to compute the hash for each input row.
+    /// - `num_partitions`: Total number of output partitions.
+    /// - `timer`: Metric used to record time spent during repartitioning.
+    ///
+    /// # Notes
+    /// This constructor cannot fail and performs no validation.
+    pub fn new_hash_partitioner(
+        exprs: Vec<Arc<dyn PhysicalExpr>>,
+        num_partitions: usize,
+        timer: metrics::Time,
+    ) -> Self {
+        Self {
+            state: BatchPartitionerState::Hash {
+                exprs,
+                num_partitions,
+                hash_buffer: vec![],
+            },
+            timer,
+        }
+    }
+
+    /// Create a new [`BatchPartitioner`] for round-robin repartitioning.
+    ///
+    /// # Parameters
+    /// - `num_partitions`: Total number of output partitions.
+    /// - `timer`: Metric used to record time spent during repartitioning.
+    /// - `input_partition`: Index of the current input partition.
+    /// - `num_input_partitions`: Total number of input partitions.
+    ///
+    /// # Notes
+    /// The starting output partition is derived from the input partition
+    /// to avoid skew when multiple input partitions are used.
+    pub fn new_round_robin_partitioner(
+        num_partitions: usize,
+        timer: metrics::Time,
+        input_partition: usize,
+        num_input_partitions: usize,
+    ) -> Self {
+        Self {
+            state: BatchPartitionerState::RoundRobin {
+                num_partitions,
+                next_idx: (input_partition * num_partitions) / 
num_input_partitions,
+            },
+            timer,
+        }
+    }
+    /// Create a new [`BatchPartitioner`] based on the provided 
[`Partitioning`] scheme.
+    ///
+    /// This is a convenience constructor that delegates to the specialized
+    /// hash or round-robin constructors depending on the partitioning variant.
+    ///
+    /// # Parameters
+    /// - `partitioning`: Partitioning scheme to apply (hash or round-robin).
+    /// - `timer`: Metric used to record time spent during repartitioning.
+    /// - `input_partition`: Index of the current input partition.
+    /// - `num_input_partitions`: Total number of input partitions.
+    ///
+    /// # Errors
+    /// Returns an error if the provided partitioning scheme is not supported.
     pub fn try_new(
         partitioning: Partitioning,
         timer: metrics::Time,
         input_partition: usize,
         num_input_partitions: usize,
     ) -> Result<Self> {
-        let state = match partitioning {
+        match partitioning {
+            Partitioning::Hash(exprs, num_partitions) => {
+                Ok(Self::new_hash_partitioner(exprs, num_partitions, timer))
+            }
             Partitioning::RoundRobinBatch(num_partitions) => {
-                BatchPartitionerState::RoundRobin {
+                Ok(Self::new_round_robin_partitioner(
                     num_partitions,
-                    // Distribute starting index evenly based on input 
partition, number of input partitions and number of partitions
-                    // to avoid they all start at partition 0 and heavily skew 
on the lower partitions
-                    next_idx: ((input_partition * num_partitions) / 
num_input_partitions),
-                }
+                    timer,
+                    input_partition,
+                    num_input_partitions,
+                ))
             }
-            Partitioning::Hash(exprs, num_partitions) => 
BatchPartitionerState::Hash {
-                exprs,
-                num_partitions,
-                hash_buffer: vec![],
-            },
-            other => return not_impl_err!("Unsupported repartitioning scheme 
{other:?}"),
-        };
-
-        Ok(Self { state, timer })
+            other => {
+                not_impl_err!("Unsupported repartitioning scheme {other:?}")
+            }
+        }
     }
 
     /// Partition the provided [`RecordBatch`] into one or more partitioned 
[`RecordBatch`]
@@ -1245,12 +1302,26 @@ impl RepartitionExec {
         input_partition: usize,
         num_input_partitions: usize,
     ) -> Result<()> {
-        let mut partitioner = BatchPartitioner::try_new(
-            partitioning,
-            metrics.repartition_time.clone(),
-            input_partition,
-            num_input_partitions,
-        )?;
+        let mut partitioner = match &partitioning {
+            Partitioning::Hash(exprs, num_partitions) => {
+                BatchPartitioner::new_hash_partitioner(
+                    exprs.clone(),
+                    *num_partitions,
+                    metrics.repartition_time.clone(),
+                )
+            }
+            Partitioning::RoundRobinBatch(num_partitions) => {
+                BatchPartitioner::new_round_robin_partitioner(
+                    *num_partitions,
+                    metrics.repartition_time.clone(),
+                    input_partition,
+                    num_input_partitions,
+                )
+            }
+            other => {
+                return not_impl_err!("Unsupported repartitioning scheme 
{other:?}");
+            }
+        };
 
         // While there are still outputs to send to, keep pulling inputs
         let mut batches_until_yield = partitioner.num_partitions();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to