viirya commented on code in PR #10892:
URL: https://github.com/apache/datafusion/pull/10892#discussion_r1638444191


##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1098,49 +1107,52 @@ impl SMJStream {
     //   2. freezes NULLs joined to dequeued buffered batch to "release" it
     fn freeze_dequeuing_buffered(&mut self) -> Result<()> {
         self.freeze_streamed()?;
-        self.freeze_buffered(1)?;
+        // Only freeze and produce the first batch in buffered_data as the 
batch is fully processed
+        self.freeze_buffered(1, true)?;
         Ok(())
     }
 
     // Produces and stages record batch from buffered indices with 
corresponding
     // NULLs on streamed side.
     //
     // Applicable only in case of Full join.
-    fn freeze_buffered(&mut self, batch_count: usize) -> Result<()> {
+    fn freeze_buffered(
+        &mut self,
+        batch_count: usize,
+        output_join_filter_fail_batch: bool,
+    ) -> Result<()> {
         if !matches!(self.join_type, JoinType::Full) {
             return Ok(());
         }
         for buffered_batch in 
self.buffered_data.batches.range_mut(..batch_count) {
             let buffered_indices = UInt64Array::from_iter_values(
                 buffered_batch.null_joined.iter().map(|&index| index as u64),
             );
-            if buffered_indices.is_empty() {
-                continue;
+            if let Some(record_batch) = produce_buffered_null_batch(
+                &self.schema,
+                &self.streamed_schema,
+                &buffered_indices,
+                buffered_batch,
+            )? {
+                self.output_record_batches.push(record_batch);
             }
             buffered_batch.null_joined.clear();
 
-            // Take buffered (right) columns
-            let buffered_columns = buffered_batch
-                .batch
-                .columns()
-                .iter()
-                .map(|column| take(column, &buffered_indices, None))
-                .collect::<Result<Vec<_>, ArrowError>>()
-                .map_err(Into::<DataFusionError>::into)?;
-
-            // Create null streamed (left) columns
-            let mut streamed_columns = self
-                .streamed_schema
-                .fields()
-                .iter()
-                .map(|f| new_null_array(f.data_type(), buffered_indices.len()))
-                .collect::<Vec<_>>();
-
-            streamed_columns.extend(buffered_columns);
-            let columns = streamed_columns;
-
-            self.output_record_batches
-                .push(RecordBatch::try_new(self.schema.clone(), columns)?);
+            // For buffered rows which are joined with streamed side but 
failed on join filter
+            if output_join_filter_fail_batch {
+                let buffered_indices = UInt64Array::from_iter_values(
+                    buffered_batch.join_filter_failed_idxs.iter().copied(),
+                );
+                if let Some(record_batch) = produce_buffered_null_batch(

Review Comment:
   I already pull common one to `produce_buffered_null_batch`. It is duplicated 
as the producing logic is same.
   
   You mean to inline `produce_buffered_null_batch` code in the `for`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to