SubhamSinghal commented on code in PR #22103:
URL: https://github.com/apache/datafusion/pull/22103#discussion_r3241172380


##########
datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs:
##########
@@ -1577,20 +1599,31 @@ impl MaterializingSortMergeJoinStream {
         let mut right_columns = Vec::with_capacity(num_right_cols);
 
         // Read each source batch once (spilled batches require disk I/O).
-        let source_data: Vec<Option<RecordBatch>> = source_batches
-            .iter()
-            .map(|&idx| {
-                let bb = &self.buffered_data.batches[idx];
-                match &bb.batch {
-                    BufferedBatchState::InMemory(batch) => Some(batch.clone()),
-                    BufferedBatchState::Spilled(spill_file) => {
-                        let file = 
BufReader::new(File::open(spill_file.path()).ok()?);
-                        let reader = StreamReader::try_new(file, None).ok()?;
-                        reader.into_iter().next()?.ok()
-                    }
+        // Track memory for each spilled batch at the point of deserialization
+        // so the pool reflects actual usage as it grows.
+        let mut spill_read_mem: usize = 0;
+        let mut source_data: Vec<Option<RecordBatch>> =
+            Vec::with_capacity(source_batches.len());
+        for &idx in &source_batches {
+            let bb = &self.buffered_data.batches[idx];
+            match &bb.batch {
+                BufferedBatchState::InMemory(batch) => {
+                    source_data.push(Some(batch.clone()));
                 }
-            })
-            .collect();
+                BufferedBatchState::Spilled(spill_file) => {
+                    let batch_mem = bb.size_estimation;
+                    self.reservation.grow(batch_mem);
+                    self.join_metrics
+                        .peak_mem_used()
+                        .set_max(self.reservation.size());
+                    spill_read_mem += batch_mem;
+
+                    let file = BufReader::new(File::open(spill_file.path())?);

Review Comment:
   @kosiew Thanks for highlighting this. Switched from manual `grow`/`shrink` 
on `self.reservation` to a scoped `MemoryReservation` via 
`self.reservation.new_empty()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to