pantShrey commented on code in PR #22230:
URL: https://github.com/apache/datafusion/pull/22230#discussion_r3281014293
##########
datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs:
##########
@@ -785,24 +794,40 @@ impl BitwiseSortMergeJoinStream {
)
.count_ones();
- // Process spilled inner batches first (read back from disk).
- if let Some(spill_file) = &self.inner_key_spill {
- let file = BufReader::new(File::open(spill_file.path())?);
- let reader = StreamReader::try_new(file, None)?;
- for batch_result in reader {
- let inner_slice = batch_result?;
- matched_count = eval_filter_for_inner_slice(
- self.outer_is_left,
- filter,
- &outer_slice,
- &inner_slice,
- &mut self.matched,
- self.outer_offset,
- outer_group_len,
- matched_count,
- )?;
- if matched_count == outer_group_len {
- break;
+ // Process spilled inner batches first asynchronously.
+ if self.inner_key_spill.is_some() || self.spill_stream.is_some() {
+ if self.spill_stream.is_none()
+ && let Some(spill_file) = &self.inner_key_spill
+ {
+ let stream = self
+ .spill_manager
+ .read_spill_as_stream(spill_file.clone(), None)?;
+ self.spill_stream = Some(stream);
+ }
+
+ while matched_count < outer_group_len {
+ let stream = self.spill_stream.as_mut().unwrap();
+ match ready!(stream.poll_next_unpin(cx)) {
+ Some(Ok(inner_slice)) => {
+ matched_count = eval_filter_for_inner_slice(
+ self.outer_is_left,
+ filter,
+ &outer_slice,
+ &inner_slice,
+ &mut self.matched,
+ self.outer_offset,
+ outer_group_len,
+ matched_count,
+ )?;
+ }
+ Some(Err(e)) => {
+ self.spill_stream = None;
+ return Poll::Ready(Err(e));
+ }
+ None => {
Review Comment:
You are right. Although `None` is needed here to signify the normal end of
the stream, it should definitely have a guard to check for an unexpectedly
empty streams. I've added a `spill_stream_has_data: bool` flag to the struct.
`None` remains the normal EOF after reading batches, but it will now fire an
`internal_err!` if the very first poll returns `None`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]