alamb commented on code in PR #15562:
URL: https://github.com/apache/datafusion/pull/15562#discussion_r2027641047
##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -241,10 +239,13 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
_ => {
// If the polling result is Poll::Ready(Some(batch))
or Poll::Ready(None),
// we remove this partition from the queue so it is
not polled again.
- self.uninitiated_partitions.retain(|idx| *idx != i);
+ self.uninitiated_partitions.pop_front();
}
}
}
+
+ // Claim the memory for the uninitiated partitions
Review Comment:
It might make more sense to simple reset the length of the fixed size queue
back so it is full and the partitions are polled again next itme
##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -533,3 +534,134 @@ impl<C: CursorValues + Unpin> RecordBatchStream for
SortPreservingMergeStream<C>
Arc::clone(self.in_progress.schema())
}
}
+
+/// Fixed size queue implemented as a circular buffer
+/// The underlying `values` are immutable, so removing elements will not drop
them and not change the
+/// capacity/length of the actual vector
+#[derive(Debug, Default)]
+struct FixedSizeQueue {
+ values: Vec<usize>,
+ start: usize,
+ end: usize,
+ len: usize,
+}
+
+impl From<Vec<usize>> for FixedSizeQueue {
+ fn from(values: Vec<usize>) -> Self {
+ let len = values.len();
+
+ Self {
+ values,
+ start: 0,
+ end: len.saturating_sub(1),
+ len,
+ }
+ }
+}
+
+impl FixedSizeQueue {
+ /// Get the value at the top of the queue
+ ///
+ /// # Implementation
+ /// return the value at [`Self::start`]
+ ///
+ /// ## Example
+ ///
+ /// ```plain
+ /// index: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |
+ /// | -------------------------------------------------------- |
+ /// value: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |
Review Comment:
I would find these examples easier to follow if `index` and `value` didn't
have the same values. maybe something like
```suggestion
/// value: | 9 | 8 | 7 | 5 | 5 | 4 | 3 | 2 | 1 | 0 |
```
##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -241,10 +239,13 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
_ => {
// If the polling result is Poll::Ready(Some(batch))
or Poll::Ready(None),
// we remove this partition from the queue so it is
not polled again.
- self.uninitiated_partitions.retain(|idx| *idx != i);
+ self.uninitiated_partitions.pop_front();
}
}
}
+
+ // Claim the memory for the uninitiated partitions
Review Comment:
This effectively resets the state of `uninitiated_partitions` to an empty
FixedSizeQueue as I understand it
Doesn't that mean after the first time the loser tree is empty it will never
enter into the loop above again to start polling?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]