Copilot commented on code in PR #18572:
URL: https://github.com/apache/datafusion/pull/18572#discussion_r2508318004


##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1194,9 +1194,18 @@ impl RepartitionExec {
         partitioning: Partitioning,
         metrics: RepartitionMetrics,
     ) -> Result<()> {
+        let is_hash_partitioning = matches!(&partitioning, 
Partitioning::Hash(_, _));
         let mut partitioner =
             BatchPartitioner::try_new(partitioning, 
metrics.repartition_time.clone())?;
 
+        let mut coalesce_batches = vec![];
+
+        if is_hash_partitioning {
+            for _ in 0..partitioner.num_partitions() {
+                coalesce_batches.push(BatchCoalescer::new(stream.schema(), 
4096));

Review Comment:
   The hardcoded batch size of 4096 should use the configured batch_size from 
the session config. Other uses of `BatchCoalescer::new` in the codebase use 
`context.session_config().batch_size()` or `config.execution.batch_size`. 
Consider passing the batch_size as a parameter to `pull_from_input` from the 
caller (`consume_input_streams`) which has access to the `context: 
Arc<TaskContext>`.



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1274,6 +1296,41 @@ impl RepartitionExec {
             }
         }
 
+        if is_hash_partitioning {
+            // flush any remaining coalesced batches
+            for (partition, coalesce_batch) in 
coalesce_batches.iter_mut().enumerate() {
+                while let Some(batch) = coalesce_batch.next_completed_batch() {
+                    let size = batch.get_array_memory_size();
+                    // Check if channel still exists (may have been removed if 
receiver hung up)
+                    if let Some(channel) = output_channels.get_mut(&partition) 
{
+                        let (batch_to_send, is_memory_batch) =
+                            match channel.reservation.lock().try_grow(size) {
+                                Ok(_) => {
+                                    // Memory available - send in-memory batch
+                                    (RepartitionBatch::Memory(batch), true)
+                                }
+                                Err(_) => {
+                                    // We're memory limited - spill to 
SpillPool
+                                    // SpillPool handles file handle reuse and 
rotation
+                                    channel.spill_writer.push_batch(&batch)?;
+                                    // Send marker indicating batch was spilled
+                                    (RepartitionBatch::Spilled, false)
+                                }
+                            };
+
+                        if 
channel.sender.send(Some(Ok(batch_to_send))).await.is_err() {

Review Comment:
   The send timer metric (`metrics.send_time[partition]`) is not being tracked 
for these final flush batches, unlike the main sending logic at line 1245. This 
will result in inaccurate metrics for hash partitioning operations as the time 
spent sending flushed batches won't be recorded.



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1274,6 +1296,41 @@ impl RepartitionExec {
             }
         }
 
+        if is_hash_partitioning {
+            // flush any remaining coalesced batches
+            for (partition, coalesce_batch) in 
coalesce_batches.iter_mut().enumerate() {

Review Comment:
   The flush logic should call `coalesce_batch.finish()` before retrieving 
remaining batches with `next_completed_batch()`. According to the 
`BatchCoalescer` API pattern used elsewhere in the codebase, `finish()` must be 
called to signal end-of-stream and ensure all buffered data is made available. 
Without this, any batches that haven't reached the target size threshold will 
be silently dropped.
   ```suggestion
               for (partition, coalesce_batch) in 
coalesce_batches.iter_mut().enumerate() {
                   coalesce_batch.finish();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to