milenkovicm commented on code in PR #18014:
URL: https://github.com/apache/datafusion/pull/18014#discussion_r2435045075
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1007,12 +1071,37 @@ impl RepartitionExec {
let timer = metrics.send_time[partition].timer();
// if there is still a receiver, send to it
- if let Some((tx, reservation)) =
output_channels.get_mut(&partition) {
- reservation.lock().try_grow(size)?;
-
- if tx.send(Some(Ok(batch))).await.is_err() {
+ if let Some((tx, reservation, spill_manager)) =
+ output_channels.get_mut(&partition)
+ {
+ let (batch_to_send, is_memory_batch) =
+ match reservation.lock().try_grow(size) {
+ Ok(_) => {
+ // Memory available - send in-memory batch
+ (RepartitionBatch::Memory(batch), true)
+ }
+ Err(_) => {
+ // We're memory limited - spill this single
batch to its own file
Review Comment:
Just one idea, would it be possible to spill before actual repartition? Try
to acquire memory equal to received batch, if it fails do not do repartition,
spill it. That would produce just N file (where N is number of partition)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]