Copilot commented on code in PR #18572:
URL: https://github.com/apache/datafusion/pull/18572#discussion_r2508327605


##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1274,6 +1297,42 @@ impl RepartitionExec {
             }
         }
 
+        if is_hash_partitioning {
+            // flush any remaining coalesced batches
+            for (partition, coalesce_batch) in 
coalesce_batches.iter_mut().enumerate() {
+                coalesce_batch.finish_buffered_batch()?;
+                if let Some(batch) = coalesce_batch.next_completed_batch() {
+                    let size = batch.get_array_memory_size();
+                    // Check if channel still exists (may have been removed if 
receiver hung up)
+                    if let Some(channel) = output_channels.get_mut(&partition) 
{
+                        let (batch_to_send, is_memory_batch) =
+                            match channel.reservation.lock().try_grow(size) {
+                                Ok(_) => {
+                                    // Memory available - send in-memory batch
+                                    (RepartitionBatch::Memory(batch), true)
+                                }
+                                Err(_) => {
+                                    // We're memory limited - spill to 
SpillPool
+                                    // SpillPool handles file handle reuse and 
rotation
+                                    channel.spill_writer.push_batch(&batch)?;
+                                    // Send marker indicating batch was spilled
+                                    (RepartitionBatch::Spilled, false)
+                                }
+                            };
+
+                        if 
channel.sender.send(Some(Ok(batch_to_send))).await.is_err() {
+                            // If the other end has hung up, it was an early 
shutdown (e.g. LIMIT)
+                            // Only shrink memory if it was a memory batch
+                            if is_memory_batch {
+                                channel.reservation.lock().shrink(size);
+                            }
+                            output_channels.remove(&partition);
+                        }
+                    }

Review Comment:
   The batch sending logic in this flush section duplicates the logic from 
lines 1244-1272. This code duplication makes maintenance harder and increases 
the risk of inconsistencies. Consider extracting this logic into a helper 
function that both the main loop and flush section can use. The function could 
be named something like `send_batch_to_channel` and accept parameters for the 
batch, partition, channel, and metrics.



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1274,6 +1297,42 @@ impl RepartitionExec {
             }
         }
 
+        if is_hash_partitioning {
+            // flush any remaining coalesced batches
+            for (partition, coalesce_batch) in 
coalesce_batches.iter_mut().enumerate() {
+                coalesce_batch.finish_buffered_batch()?;
+                if let Some(batch) = coalesce_batch.next_completed_batch() {
+                    let size = batch.get_array_memory_size();
+                    // Check if channel still exists (may have been removed if 
receiver hung up)
+                    if let Some(channel) = output_channels.get_mut(&partition) 
{
+                        let (batch_to_send, is_memory_batch) =
+                            match channel.reservation.lock().try_grow(size) {
+                                Ok(_) => {
+                                    // Memory available - send in-memory batch
+                                    (RepartitionBatch::Memory(batch), true)
+                                }
+                                Err(_) => {
+                                    // We're memory limited - spill to 
SpillPool
+                                    // SpillPool handles file handle reuse and 
rotation
+                                    channel.spill_writer.push_batch(&batch)?;
+                                    // Send marker indicating batch was spilled
+                                    (RepartitionBatch::Spilled, false)
+                                }
+                            };
+
+                        if 
channel.sender.send(Some(Ok(batch_to_send))).await.is_err() {
+                            // If the other end has hung up, it was an early 
shutdown (e.g. LIMIT)
+                            // Only shrink memory if it was a memory batch
+                            if is_memory_batch {
+                                channel.reservation.lock().shrink(size);
+                            }
+                            output_channels.remove(&partition);
+                        }
+                    }

Review Comment:
   The flush section is missing the `send_time` timing metrics that are 
recorded in the main loop (line 1246). For consistency and proper performance 
monitoring, consider wrapping the sending logic with `let timer = 
metrics.send_time[partition].timer();` ... `timer.done();` similar to the main 
loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to