Dandandan commented on code in PR #3593:
URL: https://github.com/apache/arrow-datafusion/pull/3593#discussion_r978746836
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -124,6 +124,21 @@ impl ExternalSorter {
// calls to `timer.done()` below.
let _timer = tracking_metrics.elapsed_compute().timer();
let partial = sort_batch(input, self.schema.clone(), &self.expr,
self.fetch)?;
+ // The resulting batch might be smaller than the input batch if
there
+ // is an propagated limit.
+
+ if self.fetch.is_some() {
+ let new_size = batch_byte_size(&partial.sorted_batch);
+ let size_delta = size.checked_sub(new_size).ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "The size of the sorted batch is larger than the size
of the input batch: {} > {}",
+ size,
+ new_size
+ ))
+ })?;
+ self.shrink(size_delta);
+ self.metrics.mem_used().sub(size_delta);
Review Comment:
Change looks good - maybe we can add a unit test for the memory to be
reduced when a `fetch` is provided.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]