Copilot commented on code in PR #18572:
URL: https://github.com/apache/datafusion/pull/18572#discussion_r2508327605
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1274,6 +1297,42 @@ impl RepartitionExec {
}
}
+ if is_hash_partitioning {
+ // flush any remaining coalesced batches
+ for (partition, coalesce_batch) in
coalesce_batches.iter_mut().enumerate() {
+ coalesce_batch.finish_buffered_batch()?;
+ if let Some(batch) = coalesce_batch.next_completed_batch() {
+ let size = batch.get_array_memory_size();
+ // Check if channel still exists (may have been removed if
receiver hung up)
+ if let Some(channel) = output_channels.get_mut(&partition)
{
+ let (batch_to_send, is_memory_batch) =
+ match channel.reservation.lock().try_grow(size) {
+ Ok(_) => {
+ // Memory available - send in-memory batch
+ (RepartitionBatch::Memory(batch), true)
+ }
+ Err(_) => {
+ // We're memory limited - spill to
SpillPool
+ // SpillPool handles file handle reuse and
rotation
+ channel.spill_writer.push_batch(&batch)?;
+ // Send marker indicating batch was spilled
+ (RepartitionBatch::Spilled, false)
+ }
+ };
+
+ if
channel.sender.send(Some(Ok(batch_to_send))).await.is_err() {
+ // If the other end has hung up, it was an early
shutdown (e.g. LIMIT)
+ // Only shrink memory if it was a memory batch
+ if is_memory_batch {
+ channel.reservation.lock().shrink(size);
+ }
+ output_channels.remove(&partition);
+ }
+ }
Review Comment:
The batch sending logic in this flush section duplicates the logic from
lines 1244-1272. This code duplication makes maintenance harder and increases
the risk of inconsistencies. Consider extracting this logic into a helper
function that both the main loop and flush section can use. The function could
be named something like `send_batch_to_channel` and accept parameters for the
batch, partition, channel, and metrics.
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1274,6 +1297,42 @@ impl RepartitionExec {
}
}
+ if is_hash_partitioning {
+ // flush any remaining coalesced batches
+ for (partition, coalesce_batch) in
coalesce_batches.iter_mut().enumerate() {
+ coalesce_batch.finish_buffered_batch()?;
+ if let Some(batch) = coalesce_batch.next_completed_batch() {
+ let size = batch.get_array_memory_size();
+ // Check if channel still exists (may have been removed if
receiver hung up)
+ if let Some(channel) = output_channels.get_mut(&partition)
{
+ let (batch_to_send, is_memory_batch) =
+ match channel.reservation.lock().try_grow(size) {
+ Ok(_) => {
+ // Memory available - send in-memory batch
+ (RepartitionBatch::Memory(batch), true)
+ }
+ Err(_) => {
+ // We're memory limited - spill to
SpillPool
+ // SpillPool handles file handle reuse and
rotation
+ channel.spill_writer.push_batch(&batch)?;
+ // Send marker indicating batch was spilled
+ (RepartitionBatch::Spilled, false)
+ }
+ };
+
+ if
channel.sender.send(Some(Ok(batch_to_send))).await.is_err() {
+ // If the other end has hung up, it was an early
shutdown (e.g. LIMIT)
+ // Only shrink memory if it was a memory batch
+ if is_memory_batch {
+ channel.reservation.lock().shrink(size);
+ }
+ output_channels.remove(&partition);
+ }
+ }
Review Comment:
The flush section is missing the `send_time` timing metrics that are
recorded in the main loop (line 1246). For consistency and proper performance
monitoring, consider wrapping the sending logic with `let timer =
metrics.send_time[partition].timer();` ... `timer.done();` similar to the main
loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]