milenkovicm commented on code in PR #18014:
URL: https://github.com/apache/datafusion/pull/18014#discussion_r2422983800
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1007,12 +1071,37 @@ impl RepartitionExec {
let timer = metrics.send_time[partition].timer();
// if there is still a receiver, send to it
- if let Some((tx, reservation)) =
output_channels.get_mut(&partition) {
- reservation.lock().try_grow(size)?;
-
- if tx.send(Some(Ok(batch))).await.is_err() {
+ if let Some((tx, reservation, spill_manager)) =
+ output_channels.get_mut(&partition)
+ {
+ let (batch_to_send, is_memory_batch) =
+ match reservation.lock().try_grow(size) {
+ Ok(_) => {
+ // Memory available - send in-memory batch
+ (RepartitionBatch::Memory(batch), true)
+ }
+ Err(_) => {
+ // We're memory limited - spill this single
batch to its own file
Review Comment:
Shuffle exchange in spark or ballista will spill whole repartition to disk
if I'm not mistaken I guess spilling in this case would not be different, apart
from fact that this is triggered due to memory constraints.
Maybe we could learn something from new real-time mode in spark
https://docs.google.com/document/d/1jpmZHE0ut_z2R47w3ZEnTH8QfBrUOVzr/edit?usp=drivesdk&ouid=100985621841963541010&rtpof=true&sd=true
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]