viirya commented on code in PR #10892: URL: https://github.com/apache/datafusion/pull/10892#discussion_r1638444191
########## datafusion/physical-plan/src/joins/sort_merge_join.rs: ########## @@ -1098,49 +1107,52 @@ impl SMJStream { // 2. freezes NULLs joined to dequeued buffered batch to "release" it fn freeze_dequeuing_buffered(&mut self) -> Result<()> { self.freeze_streamed()?; - self.freeze_buffered(1)?; + // Only freeze and produce the first batch in buffered_data as the batch is fully processed + self.freeze_buffered(1, true)?; Ok(()) } // Produces and stages record batch from buffered indices with corresponding // NULLs on streamed side. // // Applicable only in case of Full join. - fn freeze_buffered(&mut self, batch_count: usize) -> Result<()> { + fn freeze_buffered( + &mut self, + batch_count: usize, + output_join_filter_fail_batch: bool, + ) -> Result<()> { if !matches!(self.join_type, JoinType::Full) { return Ok(()); } for buffered_batch in self.buffered_data.batches.range_mut(..batch_count) { let buffered_indices = UInt64Array::from_iter_values( buffered_batch.null_joined.iter().map(|&index| index as u64), ); - if buffered_indices.is_empty() { - continue; + if let Some(record_batch) = produce_buffered_null_batch( + &self.schema, + &self.streamed_schema, + &buffered_indices, + buffered_batch, + )? { + self.output_record_batches.push(record_batch); } buffered_batch.null_joined.clear(); - // Take buffered (right) columns - let buffered_columns = buffered_batch - .batch - .columns() - .iter() - .map(|column| take(column, &buffered_indices, None)) - .collect::<Result<Vec<_>, ArrowError>>() - .map_err(Into::<DataFusionError>::into)?; - - // Create null streamed (left) columns - let mut streamed_columns = self - .streamed_schema - .fields() - .iter() - .map(|f| new_null_array(f.data_type(), buffered_indices.len())) - .collect::<Vec<_>>(); - - streamed_columns.extend(buffered_columns); - let columns = streamed_columns; - - self.output_record_batches - .push(RecordBatch::try_new(self.schema.clone(), columns)?); + // For buffered rows which are joined with streamed side but failed on join filter + if output_join_filter_fail_batch { + let buffered_indices = UInt64Array::from_iter_values( + buffered_batch.join_filter_failed_idxs.iter().copied(), + ); + if let Some(record_batch) = produce_buffered_null_batch( Review Comment: I already pull common one to `produce_buffered_null_batch`. It is duplicated as the producing logic is same. You mean to inline `produce_buffered_null_batch` code in the `for`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org