kosiew commented on code in PR #20480:
URL: https://github.com/apache/datafusion/pull/20480#discussion_r2944817890
##########
datafusion/physical-plan/src/joins/sort_merge_join/stream.rs:
##########
@@ -1807,3 +1805,93 @@ fn is_join_arrays_equal(
}
Ok(true)
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::datatypes::{Field, Schema};
+ use arrow::ipc::writer::StreamWriter;
+ use datafusion_execution::disk_manager::DiskManager;
+
+ /// Creates a `BufferedBatch` with `BufferedBatchState::Spilled` by writing
+ /// the given batch to an IPC temp file.
+ fn make_spilled_batch(batch: &RecordBatch) -> Result<BufferedBatch> {
Review Comment:
`make_spilled_batch` writes IPC files directly with `StreamWriter`, which
is fine for this test, but using the existing spill machinery (SpillManager)
would keep the test closer to the production path.
##########
datafusion/physical-plan/src/joins/sort_merge_join/stream.rs:
##########
@@ -1807,3 +1805,93 @@ fn is_join_arrays_equal(
}
Ok(true)
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::datatypes::{Field, Schema};
+ use arrow::ipc::writer::StreamWriter;
+ use datafusion_execution::disk_manager::DiskManager;
+
+ /// Creates a `BufferedBatch` with `BufferedBatchState::Spilled` by writing
+ /// the given batch to an IPC temp file.
+ fn make_spilled_batch(batch: &RecordBatch) -> Result<BufferedBatch> {
+ let disk_manager = Arc::new(DiskManager::builder().build()?);
+ let spill_file = disk_manager.create_tmp_file("test_spill")?;
+ {
+ let file = File::create(spill_file.path())?;
+ let mut writer = StreamWriter::try_new(file, &batch.schema())?;
+ writer.write(batch)?;
+ writer.finish()?;
+ }
+ Ok(BufferedBatch {
+ batch: BufferedBatchState::Spilled(spill_file),
+ range: 0..batch.num_rows(),
+ join_arrays: vec![],
+ null_joined: vec![],
+ size_estimation: 0,
+ join_filter_not_matched_map: HashMap::new(),
+ num_rows: batch.num_rows(),
+ })
+ }
+
+ /// Verifies the spill path in `fetch_right_columns_from_batch_by_idxs`
+ /// produces identical results to the in-memory path.
+ ///
+ /// This catches a prior bug where the spill path used
+ /// `vec.extend(take(...))`. Since `Result` implements `IntoIterator`
+ /// (yielding 0 items on `Err`), any `take` error would silently drop
+ /// that column from the output instead of propagating the error.
+ #[test]
+ fn spill_path_matches_in_memory_path() -> Result<()> {
Review Comment:
The new regression test only covers the happy path.
It does not exercise the silent error-swallowing bug that this patch fixes.
The previous bug appeared when take(...) returned Err. Because Result<T, E>
implements IntoIterator, vec.extend(result) would quietly add zero items
instead of failing.
In this test, all indices are in bounds, so every take(...) succeeds. The
old buggy implementation would also pass this test.
Please add a failure-path case. Use an out-of-bounds index (or another way
to make take return an error).
The test should verify that the spilled path returns an error rather than a
truncated column list.
##########
datafusion/physical-plan/src/joins/sort_merge_join/stream.rs:
##########
@@ -1637,16 +1637,14 @@ fn fetch_right_columns_from_batch_by_idxs(
}
// If the batch was spilled to disk, less likely
BufferedBatchState::Spilled(spill_file) => {
- let mut buffered_cols: Vec<ArrayRef> =
- Vec::with_capacity(buffered_indices.len());
-
let file = BufReader::new(File::open(spill_file.path())?);
let reader = StreamReader::try_new(file, None)?;
+ let mut buffered_cols: Vec<ArrayRef> = Vec::new();
for batch in reader {
- batch?.columns().iter().for_each(|column| {
- buffered_cols.extend(take(column, &buffered_indices, None))
- });
+ for column in batch?.columns() {
+ buffered_cols.push(take(column, buffered_indices, None)?);
Review Comment:
The spilled branch still reimplements the column-by-column take logic.
It would be clearer if both branches shared a small helper over &[ArrayRef].
Alternatively, the spilled branch could call take_arrays(batch.columns(),
buffered_indices, None) after reading the spilled batch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]