xudong963 commented on code in PR #20922:
URL: https://github.com/apache/datafusion/pull/20922#discussion_r2972684660
##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -126,49 +131,56 @@ impl BatchBuilder {
&self.schema
}
- /// Drains the in_progress row indexes, and builds a new RecordBatch from
them
- ///
- /// Will then drop any batches for which all rows have been yielded to the
output
- ///
- /// Returns `None` if no pending rows
- pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
- if self.is_empty() {
- return Ok(None);
- }
-
- let columns = (0..self.schema.fields.len())
+ /// Try to interleave all columns using the given index slice.
+ fn try_interleave_columns(
+ &self,
+ indices: &[(usize, usize)],
+ ) -> Result<Vec<ArrayRef>> {
+ (0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self
.batches
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
- Ok(interleave(&arrays, &self.indices)?)
+ recover_offset_overflow_from_panic(|| interleave(&arrays,
indices))
})
- .collect::<Result<Vec<_>>>()?;
-
- self.indices.clear();
-
- // New cursors are only created once the previous cursor for the stream
- // is finished. This means all remaining rows from all but the last
batch
- // for each stream have been yielded to the newly created record batch
- //
- // We can therefore drop all but the last batch for each stream
- let mut batch_idx = 0;
- let mut retained = 0;
- self.batches.retain(|(stream_idx, batch)| {
- let stream_cursor = &mut self.cursors[*stream_idx];
- let retain = stream_cursor.batch_idx == batch_idx;
- batch_idx += 1;
-
- if retain {
- stream_cursor.batch_idx = retained;
- retained += 1;
- } else {
- self.batches_mem_used -= get_record_batch_memory_size(batch);
- }
- retain
- });
+ .collect::<Result<Vec<_>>>()
+ }
+
+ /// Builds a record batch from the first `rows_to_emit` buffered rows.
+ fn finish_record_batch(
+ &mut self,
+ rows_to_emit: usize,
+ columns: Vec<ArrayRef>,
+ ) -> Result<RecordBatch> {
+ // Remove consumed indices, keeping any remaining for the next call.
+ self.indices.drain(..rows_to_emit);
+
+ // Only clean up fully-consumed batches when all indices are drained,
Review Comment:
Good point, added a comment clarifying this is intentional — the retention
is bounded and short-lived since leftover rows drain over subsequent polls.
Doing a precise scan of remaining indices to release individual batches would
add complexity to the hot path for marginal gain.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]