Copilot commented on code in PR #18207:
URL: https://github.com/apache/datafusion/pull/18207#discussion_r2494726186


##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1228,65 +1419,74 @@ impl Stream for RepartitionStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
+        use futures::StreamExt;
+
         loop {
-            match &mut self.state {
-                RepartitionStreamState::ReceivingFromChannel => {
-                    let value = 
futures::ready!(self.input.recv().poll_unpin(cx));
+            match self.state {
+                StreamState::ReadingMemory => {
+                    // Poll the memory channel for next message
+                    let value = match self.input.recv().poll_unpin(cx) {
+                        Poll::Ready(v) => v,
+                        Poll::Pending => {
+                            // Nothing from channel, wait
+                            return Poll::Pending;
+                        }
+                    };
+
                     match value {
                         Some(Some(v)) => match v {
                             Ok(RepartitionBatch::Memory(batch)) => {
-                                // Release memory and return
+                                // Release memory and return batch
                                 self.reservation
                                     .lock()
                                     .shrink(batch.get_array_memory_size());
                                 return Poll::Ready(Some(Ok(batch)));
                             }
-                            Ok(RepartitionBatch::Spilled { spill_file, size }) 
=> {
-                                // Read from disk - SpillReaderStream uses 
tokio::fs internally
-                                // Pass the original size for validation
-                                let stream = self
-                                    .spill_manager
-                                    .read_spill_as_stream(spill_file, 
Some(size))?;
-                                self.state =
-                                    
RepartitionStreamState::ReadingSpilledBatch(stream);
-                                // Continue loop to poll the stream immediately
+                            Ok(RepartitionBatch::Spilled) => {
+                                // Batch was spilled, transition to reading 
from spill stream
+                                // We must block on spill stream until we get 
the batch
+                                // to preserve ordering
+                                self.state = StreamState::ReadingSpilled;
+                                continue;
                             }
                             Err(e) => {
                                 return Poll::Ready(Some(Err(e)));
                             }
                         },
                         Some(None) => {
-                            self.num_input_partitions_processed += 1;
-
-                            if self.num_input_partitions
-                                == self.num_input_partitions_processed
-                            {
-                                // all input partitions have finished sending 
batches
+                            // One input partition finished
+                            self.remaining_partitions -= 1;
+                            if self.remaining_partitions == 0 {
+                                // All input partitions finished
                                 return Poll::Ready(None);
-                            } else {
-                                // other partitions still have data to send
-                                continue;
                             }
+                            // Continue to poll for more data from other 
partitions
+                            continue;
                         }
                         None => {
+                            // Channel closed unexpectedly
                             return Poll::Ready(None);
                         }
                     }
                 }
-                RepartitionStreamState::ReadingSpilledBatch(stream) => {
-                    match futures::ready!(stream.poll_next_unpin(cx)) {
-                        Some(Ok(batch)) => {
-                            // Return batch and stay in ReadingSpilledBatch 
state to read more batches
+                StreamState::ReadingSpilled => {
+                    // Poll spill stream for the spilled batch
+                    match self.spill_stream.poll_next_unpin(cx) {
+                        Poll::Ready(Some(Ok(batch))) => {
+                            self.state = StreamState::ReadingMemory;
                             return Poll::Ready(Some(Ok(batch)));
                         }
-                        Some(Err(e)) => {
-                            self.state = 
RepartitionStreamState::ReceivingFromChannel;
+                        Poll::Ready(Some(Err(e))) => {
                             return Poll::Ready(Some(Err(e)));
                         }
-                        None => {
-                            // Spill stream ended - go back to receiving from 
channel
-                            self.state = 
RepartitionStreamState::ReceivingFromChannel;
-                            continue;
+                        Poll::Ready(None) => {
+                            // Spill stream ended keep draining the memory 
channel

Review Comment:
   Missing punctuation. The comment should have a comma after 'ended' for 
proper grammar: 'Spill stream ended, keep draining the memory channel'.
   ```suggestion
                               // Spill stream ended, keep draining the memory 
channel
   ```



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1331,55 +1550,67 @@ impl Stream for PerPartitionStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
+        use futures::StreamExt;
+
         loop {
-            match &mut self.state {
-                RepartitionStreamState::ReceivingFromChannel => {
-                    let value = 
futures::ready!(self.receiver.recv().poll_unpin(cx));
+            match self.state {
+                StreamState::ReadingMemory => {
+                    // Poll the memory channel for next message
+                    let value = match self.receiver.recv().poll_unpin(cx) {
+                        Poll::Ready(v) => v,
+                        Poll::Pending => {
+                            // Nothing from channel, wait
+                            return Poll::Pending;
+                        }
+                    };
+
                     match value {
                         Some(Some(v)) => match v {
                             Ok(RepartitionBatch::Memory(batch)) => {
-                                // Release memory and return
+                                // Release memory and return batch
                                 self.reservation
                                     .lock()
                                     .shrink(batch.get_array_memory_size());
                                 return Poll::Ready(Some(Ok(batch)));
                             }
-                            Ok(RepartitionBatch::Spilled { spill_file, size }) 
=> {
-                                // Read from disk - SpillReaderStream uses 
tokio::fs internally
-                                // Pass the original size for validation
-                                let stream = self
-                                    .spill_manager
-                                    .read_spill_as_stream(spill_file, 
Some(size))?;
-                                self.state =
-                                    
RepartitionStreamState::ReadingSpilledBatch(stream);
-                                // Continue loop to poll the stream immediately
+                            Ok(RepartitionBatch::Spilled) => {
+                                // Batch was spilled, transition to reading 
from spill stream
+                                // We must block on spill stream until we get 
the batch
+                                // to preserve ordering
+                                self.state = StreamState::ReadingSpilled;
+                                continue;
                             }
                             Err(e) => {
                                 return Poll::Ready(Some(Err(e)));
                             }
                         },
                         Some(None) => {
-                            // Input partition has finished sending batches
                             return Poll::Ready(None);
                         }
-                        None => return Poll::Ready(None),
+                        None => {
+                            // Channel closed unexpectedly
+                            return Poll::Ready(None);
+                        }
                     }
                 }
-
-                RepartitionStreamState::ReadingSpilledBatch(stream) => {
-                    match futures::ready!(stream.poll_next_unpin(cx)) {
-                        Some(Ok(batch)) => {
-                            // Return batch and stay in ReadingSpilledBatch 
state to read more batches
+                StreamState::ReadingSpilled => {
+                    // Poll spill stream for the spilled batch
+                    match self.spill_stream.poll_next_unpin(cx) {
+                        Poll::Ready(Some(Ok(batch))) => {
+                            self.state = StreamState::ReadingMemory;
                             return Poll::Ready(Some(Ok(batch)));
                         }
-                        Some(Err(e)) => {
-                            self.state = 
RepartitionStreamState::ReceivingFromChannel;
+                        Poll::Ready(Some(Err(e))) => {
                             return Poll::Ready(Some(Err(e)));
                         }
-                        None => {
-                            // Spill stream ended - go back to receiving from 
channel
-                            self.state = 
RepartitionStreamState::ReceivingFromChannel;
-                            continue;
+                        Poll::Ready(None) => {
+                            // Spill stream ended keep draining the memory 
channel

Review Comment:
   Missing punctuation. The comment should have a comma after 'ended' for 
proper grammar: 'Spill stream ended, keep draining the memory channel'.
   ```suggestion
                               // Spill stream ended, keep draining the memory 
channel
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to