This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 44daa9a7b8 minor: enhance comment in SortPreservingMergeStream.abort
(#17115)
44daa9a7b8 is described below
commit 44daa9a7b86f39b324762f93af901b292e98008d
Author: mwish <[email protected]>
AuthorDate: Wed Aug 13 01:55:40 2025 +0800
minor: enhance comment in SortPreservingMergeStream.abort (#17115)
* minor: enhance comment in SortPreservingMergeStream.abort
* change aborted to done
---
datafusion/physical-plan/src/sorts/merge.rs | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/merge.rs
b/datafusion/physical-plan/src/sorts/merge.rs
index ca2d5f2105..0b0136cd12 100644
--- a/datafusion/physical-plan/src/sorts/merge.rs
+++ b/datafusion/physical-plan/src/sorts/merge.rs
@@ -49,8 +49,9 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> {
/// used to record execution metrics
metrics: BaselineMetrics,
- /// If the stream has encountered an error
- aborted: bool,
+ /// If the stream has encountered an error or reaches the
+ /// `fetch` limit.
+ done: bool,
/// A loser tree that always produces the minimum cursor
///
@@ -162,7 +163,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
in_progress: BatchBuilder::new(schema, stream_count, batch_size,
reservation),
streams,
metrics,
- aborted: false,
+ done: false,
cursors: (0..stream_count).map(|_| None).collect(),
prev_cursors: (0..stream_count).map(|_| None).collect(),
round_robin_tie_breaker_mode: false,
@@ -206,7 +207,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
- if self.aborted {
+ if self.done {
return Poll::Ready(None);
}
// Once all partitions have set their corresponding cursors for the
loser tree,
@@ -220,7 +221,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
let partition_idx = self.uninitiated_partitions[idx];
match self.maybe_poll_stream(cx, partition_idx) {
Poll::Ready(Err(e)) => {
- self.aborted = true;
+ self.done = true;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
@@ -268,7 +269,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
if !self.loser_tree_adjusted {
let winner = self.loser_tree[0];
if let Err(e) = ready!(self.maybe_poll_stream(cx, winner)) {
- self.aborted = true;
+ self.done = true;
return Poll::Ready(Some(Err(e)));
}
self.update_loser_tree();
@@ -281,7 +282,7 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
// stop sorting if fetch has been reached
if self.fetch_reached() {
- self.aborted = true;
+ self.done = true;
} else if self.in_progress.len() < self.batch_size {
continue;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]