This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new d5dae86a6a [branch-52] Split BatchPartitioner::try_new into hash and
round-robin constructors (#19681)
d5dae86a6a is described below
commit d5dae86a6a0e087bac4a1ce41e267f91f4c348a7
Author: Mohit rao <[email protected]>
AuthorDate: Thu Jan 8 08:57:26 2026 +0530
[branch-52] Split BatchPartitioner::try_new into hash and round-robin
constructors (#19681)
Backport of #19668 to branch-52.
This PR cherry-picks commit 680ddcc from main.
Includes:
- Split of BatchPartitioner::try_new into hash and round-robin
constructors
- Documentation improvements
- No behavior changes
part of #18566
Co-authored-by: Your Name <[email protected]>
---
datafusion/physical-plan/src/repartition/mod.rs | 117 +++++++++++++++++++-----
1 file changed, 94 insertions(+), 23 deletions(-)
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 1efdaaabc7..d50404c8fc 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -434,33 +434,90 @@ pub const REPARTITION_RANDOM_STATE: SeededRandomState =
SeededRandomState::with_seeds(0, 0, 0, 0);
impl BatchPartitioner {
- /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`]
+ /// Create a new [`BatchPartitioner`] for hash-based repartitioning.
///
- /// The time spent repartitioning will be recorded to `timer`
+ /// # Parameters
+ /// - `exprs`: Expressions used to compute the hash for each input row.
+ /// - `num_partitions`: Total number of output partitions.
+ /// - `timer`: Metric used to record time spent during repartitioning.
+ ///
+ /// # Notes
+ /// This constructor cannot fail and performs no validation.
+ pub fn new_hash_partitioner(
+ exprs: Vec<Arc<dyn PhysicalExpr>>,
+ num_partitions: usize,
+ timer: metrics::Time,
+ ) -> Self {
+ Self {
+ state: BatchPartitionerState::Hash {
+ exprs,
+ num_partitions,
+ hash_buffer: vec![],
+ },
+ timer,
+ }
+ }
+
+ /// Create a new [`BatchPartitioner`] for round-robin repartitioning.
+ ///
+ /// # Parameters
+ /// - `num_partitions`: Total number of output partitions.
+ /// - `timer`: Metric used to record time spent during repartitioning.
+ /// - `input_partition`: Index of the current input partition.
+ /// - `num_input_partitions`: Total number of input partitions.
+ ///
+ /// # Notes
+ /// The starting output partition is derived from the input partition
+ /// to avoid skew when multiple input partitions are used.
+ pub fn new_round_robin_partitioner(
+ num_partitions: usize,
+ timer: metrics::Time,
+ input_partition: usize,
+ num_input_partitions: usize,
+ ) -> Self {
+ Self {
+ state: BatchPartitionerState::RoundRobin {
+ num_partitions,
+ next_idx: (input_partition * num_partitions) /
num_input_partitions,
+ },
+ timer,
+ }
+ }
+ /// Create a new [`BatchPartitioner`] based on the provided
[`Partitioning`] scheme.
+ ///
+ /// This is a convenience constructor that delegates to the specialized
+ /// hash or round-robin constructors depending on the partitioning variant.
+ ///
+ /// # Parameters
+ /// - `partitioning`: Partitioning scheme to apply (hash or round-robin).
+ /// - `timer`: Metric used to record time spent during repartitioning.
+ /// - `input_partition`: Index of the current input partition.
+ /// - `num_input_partitions`: Total number of input partitions.
+ ///
+ /// # Errors
+ /// Returns an error if the provided partitioning scheme is not supported.
pub fn try_new(
partitioning: Partitioning,
timer: metrics::Time,
input_partition: usize,
num_input_partitions: usize,
) -> Result<Self> {
- let state = match partitioning {
+ match partitioning {
+ Partitioning::Hash(exprs, num_partitions) => {
+ Ok(Self::new_hash_partitioner(exprs, num_partitions, timer))
+ }
Partitioning::RoundRobinBatch(num_partitions) => {
- BatchPartitionerState::RoundRobin {
+ Ok(Self::new_round_robin_partitioner(
num_partitions,
- // Distribute starting index evenly based on input
partition, number of input partitions and number of partitions
- // to avoid they all start at partition 0 and heavily skew
on the lower partitions
- next_idx: ((input_partition * num_partitions) /
num_input_partitions),
- }
+ timer,
+ input_partition,
+ num_input_partitions,
+ ))
}
- Partitioning::Hash(exprs, num_partitions) =>
BatchPartitionerState::Hash {
- exprs,
- num_partitions,
- hash_buffer: vec![],
- },
- other => return not_impl_err!("Unsupported repartitioning scheme
{other:?}"),
- };
-
- Ok(Self { state, timer })
+ other => {
+ not_impl_err!("Unsupported repartitioning scheme {other:?}")
+ }
+ }
}
/// Partition the provided [`RecordBatch`] into one or more partitioned
[`RecordBatch`]
@@ -1245,12 +1302,26 @@ impl RepartitionExec {
input_partition: usize,
num_input_partitions: usize,
) -> Result<()> {
- let mut partitioner = BatchPartitioner::try_new(
- partitioning,
- metrics.repartition_time.clone(),
- input_partition,
- num_input_partitions,
- )?;
+ let mut partitioner = match &partitioning {
+ Partitioning::Hash(exprs, num_partitions) => {
+ BatchPartitioner::new_hash_partitioner(
+ exprs.clone(),
+ *num_partitions,
+ metrics.repartition_time.clone(),
+ )
+ }
+ Partitioning::RoundRobinBatch(num_partitions) => {
+ BatchPartitioner::new_round_robin_partitioner(
+ *num_partitions,
+ metrics.repartition_time.clone(),
+ input_partition,
+ num_input_partitions,
+ )
+ }
+ other => {
+ return not_impl_err!("Unsupported repartitioning scheme
{other:?}");
+ }
+ };
// While there are still outputs to send to, keep pulling inputs
let mut batches_until_yield = partitioner.num_partitions();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]