alamb commented on code in PR #17105:
URL: https://github.com/apache/datafusion/pull/17105#discussion_r2264639473
##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -277,102 +275,41 @@ impl Stream for CoalesceBatchesStream {
}
}
-/// Enumeration of possible states for `CoalesceBatchesStream`.
-/// It represents different stages in the lifecycle of a stream of record
batches.
-///
-/// An example of state transition:
-/// Notation:
-/// `[3000]`: A batch with size 3000
-/// `{[2000], [3000]}`: `CoalesceBatchStream`'s internal buffer with 2 batches
buffered
-/// Input of `CoalesceBatchStream` will generate three batches `[2000],
[3000], [4000]`
-/// The coalescing procedure will go through the following steps with 4096
coalescing threshold:
-/// 1. Read the first batch and get it buffered.
-/// - initial state: `Pull`
-/// - initial buffer: `{}`
-/// - updated buffer: `{[2000]}`
-/// - next state: `Pull`
-/// 2. Read the second batch, the coalescing target is reached since 2000 +
3000 > 4096
-/// - initial state: `Pull`
-/// - initial buffer: `{[2000]}`
-/// - updated buffer: `{[2000], [3000]}`
-/// - next state: `ReturnBuffer`
-/// 4. Two batches in the batch get merged and consumed by the upstream
operator.
-/// - initial state: `ReturnBuffer`
-/// - initial buffer: `{[2000], [3000]}`
-/// - updated buffer: `{}`
-/// - next state: `Pull`
-/// 5. Read the third input batch.
-/// - initial state: `Pull`
-/// - initial buffer: `{}`
-/// - updated buffer: `{[4000]}`
-/// - next state: `Pull`
-/// 5. The input is ended now. Jump to exhaustion state preparing the
finalized data.
-/// - initial state: `Pull`
-/// - initial buffer: `{[4000]}`
-/// - updated buffer: `{[4000]}`
-/// - next state: `Exhausted`
-#[derive(Debug, Clone, Eq, PartialEq)]
-enum CoalesceBatchesStreamState {
- /// State to pull a new batch from the input stream.
- Pull,
- /// State to return a buffered batch.
- ReturnBuffer,
- /// State indicating that the stream is exhausted.
- Exhausted,
-}
-
impl CoalesceBatchesStream {
fn poll_next_inner(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
let cloned_time = self.baseline_metrics.elapsed_compute().clone();
loop {
- match &self.inner_state {
- CoalesceBatchesStreamState::Pull => {
- // Attempt to pull the next batch from the input stream.
- let input_batch = ready!(self.input.poll_next_unpin(cx));
- // Start timing the operation. The timer records time upon
being dropped.
- let _timer = cloned_time.timer();
-
- match input_batch {
- Some(Ok(batch)) => match
self.coalescer.push_batch(batch) {
- CoalescerState::Continue => {}
- CoalescerState::LimitReached => {
- self.inner_state =
CoalesceBatchesStreamState::Exhausted;
- }
- CoalescerState::TargetReached => {
- self.inner_state =
- CoalesceBatchesStreamState::ReturnBuffer;
- }
- },
- None => {
- // End of input stream, but buffered batches might
still be present.
- self.inner_state =
CoalesceBatchesStreamState::Exhausted;
- }
- other => return Poll::Ready(other),
- }
- }
- CoalesceBatchesStreamState::ReturnBuffer => {
- let _timer = cloned_time.timer();
- // Combine buffered batches into one batch and return it.
- let batch = self.coalescer.finish_batch()?;
- // Set to pull state for the next iteration.
- self.inner_state = CoalesceBatchesStreamState::Pull;
- return Poll::Ready(Some(Ok(batch)));
+ // If there is any completed batch ready, return it
+ if let Some(batch) = self.coalescer.next_completed_batch() {
+ return Poll::Ready(Some(Ok(batch)));
+ }
+ if self.completed {
+ // If input is done and no batches are ready, return None to
signal end of stream.
+ return Poll::Ready(None);
+ }
+ // Attempt to pull the next batch from the input stream.
+ let input_batch = ready!(self.input.poll_next_unpin(cx));
+ // Start timing the operation. The timer records time upon being
dropped.
+ let _timer = cloned_time.timer();
+
+ match input_batch {
+ None => {
+ // Input stream is exhausted, finalize any remaining
batches
+ self.completed = true;
+ self.coalescer.finish()?;
}
- CoalesceBatchesStreamState::Exhausted => {
- // Handle the end of the input stream.
- return if self.coalescer.is_empty() {
- // If buffer is empty, return None indicating the
stream is fully consumed.
- Poll::Ready(None)
- } else {
- let _timer = cloned_time.timer();
- // If the buffer still contains batches, prepare to
return them.
- let batch = self.coalescer.finish_batch()?;
- Poll::Ready(Some(Ok(batch)))
- };
+ Some(Ok(batch)) => {
+ if self.coalescer.push_batch(batch)? {
Review Comment:
I found this API to be somewhat confusing (the fact that a `true` return
value means limit was reached)
Maybe returning an enum would be clearer here
I don't think this is a correctness issue, just a readability thing I noticed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]