gabotechs commented on code in PR #22159:
URL: https://github.com/apache/datafusion/pull/22159#discussion_r3241039657


##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -736,6 +697,63 @@ impl BatchPartitioner {
             BatchPartitionerState::Hash { indices, .. } => indices.len(),
         }
     }
+
+    /// Build repartitioned hash output batches using one `take` per input 
batch.
+    ///
+    /// The hash router first fills one index vector per output partition. 
This method
+    /// concatenates those index vectors, performs one grouped `take_arrays`, 
and
+    /// then returns each output partition as a slice of the reordered batch.
+    ///
+    /// For example, given partition indices:
+    ///
+    /// ```text
+    /// partition 0: [2, 5]
+    /// partition 1: []
+    /// partition 2: [0, 3, 4]
+    /// ```
+    ///
+    /// this method takes rows in `[2, 5, 0, 3, 4]` order once, then returns
+    /// `partition 0 = slice(0, 2)` and `partition 2 = slice(2, 3)`.
+    fn partition_grouped_take(
+        batch: &RecordBatch,
+        indices: &mut [Vec<u32>],
+        timer: &metrics::Time,
+    ) -> Result<Vec<Result<(usize, RecordBatch)>>> {
+        let mut partition_ranges = Vec::with_capacity(indices.len());
+        let mut reordered_indices = Vec::with_capacity(batch.num_rows());
+
+        for (partition, p_indices) in indices.iter_mut().enumerate() {
+            if p_indices.is_empty() {
+                continue;
+            }
+
+            let start = reordered_indices.len();
+            reordered_indices.extend_from_slice(p_indices);
+            partition_ranges.push((partition, start, p_indices.len()));
+            p_indices.clear();
+        }
+
+        if reordered_indices.is_empty() {
+            return Ok(vec![]);
+        }
+
+        let indices_array: PrimitiveArray<UInt32Type> = 
reordered_indices.into();
+        let reordered_batch = {
+            let _timer = timer.timer();
+            let columns = take_arrays(batch.columns(), &indices_array, None)?;
+
+            let mut options = RecordBatchOptions::new();
+            options = options.with_row_count(Some(indices_array.len()));
+            RecordBatch::try_new_with_options(batch.schema(), columns, 
&options).unwrap()

Review Comment:
   This `.unwrap()` here... it was there before, but maybe now it's an 
opportunity to just return an error with `?`.



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -736,6 +697,63 @@ impl BatchPartitioner {
             BatchPartitionerState::Hash { indices, .. } => indices.len(),
         }
     }
+
+    /// Build repartitioned hash output batches using one `take` per input 
batch.
+    ///
+    /// The hash router first fills one index vector per output partition. 
This method
+    /// concatenates those index vectors, performs one grouped `take_arrays`, 
and
+    /// then returns each output partition as a slice of the reordered batch.
+    ///
+    /// For example, given partition indices:
+    ///
+    /// ```text
+    /// partition 0: [2, 5]
+    /// partition 1: []
+    /// partition 2: [0, 3, 4]
+    /// ```
+    ///
+    /// this method takes rows in `[2, 5, 0, 3, 4]` order once, then returns
+    /// `partition 0 = slice(0, 2)` and `partition 2 = slice(2, 3)`.
+    fn partition_grouped_take(
+        batch: &RecordBatch,
+        indices: &mut [Vec<u32>],
+        timer: &metrics::Time,
+    ) -> Result<Vec<Result<(usize, RecordBatch)>>> {
+        let mut partition_ranges = Vec::with_capacity(indices.len());
+        let mut reordered_indices = Vec::with_capacity(batch.num_rows());
+
+        for (partition, p_indices) in indices.iter_mut().enumerate() {
+            if p_indices.is_empty() {
+                continue;
+            }
+
+            let start = reordered_indices.len();
+            reordered_indices.extend_from_slice(p_indices);
+            partition_ranges.push((partition, start, p_indices.len()));
+            p_indices.clear();
+        }
+
+        if reordered_indices.is_empty() {
+            return Ok(vec![]);
+        }
+
+        let indices_array: PrimitiveArray<UInt32Type> = 
reordered_indices.into();
+        let reordered_batch = {
+            let _timer = timer.timer();

Review Comment:
   It looks like `timer` is not measuring the per-partition `slice()` operation 
below. That should be pretty cheap and most likely negligible, but maybe it's 
more correct to just move `let _timer = timer.timer();` a couple of lines above 
so that slicing is also measured?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to