gruuya commented on code in PR #7180:
URL: https://github.com/apache/arrow-datafusion/pull/7180#discussion_r1285004993
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -253,11 +251,27 @@ impl ExternalSorter {
/// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
///
/// Updates memory usage metrics, and possibly triggers spilling to disk
- async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
+ async fn insert_batch(&mut self, mut input: RecordBatch) -> Result<()> {
if input.num_rows() == 0 {
return Ok(());
}
+ let mut batch_sorted = false;
+ if self.fetch.map_or(false, |f| f < input.num_rows()) {
+ // Eagerly sort the batch to potentially reduce the number of rows
+ // after applying the fetch parameter; first perform a memory
reservation
+ // for the sorting procedure.
+ let mut reservation =
Review Comment:
I think the reasoning for the new consumer was that we only want to reserve
a bit of memory briefly to account for the overhead of keeping both the sorted
and original batch at the same time. Given that in this case we drop the old
batch asap (due to `input` re-assignment) in favor of a smaller/truncated one
I'd agree that a new consumer is not needed in that case. (Same could be said
about the consumer/reservation in `sort_batch_stream`, though in that case
since it isn't given that there is a `LIMIT` we'd end up holding 2 same-sized
batches at one point in time.)
That said, `reservation.try_grow(input.get_array_memory_size())` does then
get called immediately below to check whether the new batch can be kept in
memory without breaking the configured limit, so I don't think there's a need
for another `try_grow` prior to that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]