2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191376010
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1067,14 +1074,13 @@ impl GroupedHashAggregateStream {
sort_batch(&batch, &expr, None)
})),
)));
- for spill in self.spill_state.spills.drain(..) {
- let stream =
self.spill_state.spill_manager.read_spill_as_stream(spill)?;
- streams.push(stream);
- }
+
self.spill_state.is_stream_merging = true;
self.input = StreamingMergeBuilder::new()
.with_streams(streams)
.with_schema(schema)
+ .with_spill_manager(self.spill_state.spill_manager.clone())
+ .with_sorted_spill_files(std::mem::take(&mut
self.spill_state.spills))
Review Comment:
I suggest to spill all in-memory batches (in `streams`) to disk, before this
final merging step. Also, let the multi pass merge operator also only handle
spill files, and don't have to handle in-mem batches and spills at the same
time.
This is just a simplification for now, we can do a optimization to avoid
this re-spill step in the future.
The issue is, without special handling, it's possible that in-mem batches
will take most of the available memory budget, and leave only a very small
memory part for multi-pass spilling to continue. This can cause slow downs or
even prevent some cases to finish.
We're already doing this in sort executor, see:
https://github.com/apache/datafusion/blob/14487ddc275fc1f148f339293664fe7f83d91d09/datafusion/physical-plan/src/sorts/sort.rs#L336-L341
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]