Copilot commented on code in PR #877:
URL: https://github.com/apache/mahout/pull/877#discussion_r2707315470
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+ file: File,
+ header: NpyHeader,
+ row_cursor: usize,
+ column_buf: Vec<f64>,
+}
+
+impl NumpyStreamingReader {
+ /// Create a new streaming NumPy reader.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let path = path.as_ref();
+
+ match path.try_exists() {
+ Ok(false) => {
+ return Err(MahoutError::Io(format!(
+ "NumPy file not found: {}",
+ path.display()
+ )));
+ }
+ Err(e) => {
+ return Err(MahoutError::Io(format!(
+ "Failed to check if NumPy file exists at {}: {}",
+ path.display(),
+ e
+ )));
+ }
+ Ok(true) => {}
+ }
+
+ let mut file = File::open(path)
+ .map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+ let header = read_npy_header(path, &mut file)?;
+
+ Ok(Self {
+ file,
+ header,
+ row_cursor: 0,
+ column_buf: Vec::new(),
+ })
+ }
+}
+
+impl DataReader for NumpyStreamingReader {
+ fn read_batch(&mut self) -> Result<(Vec<f64>, usize, usize)> {
+ let total_elements = self.header.total_elements();
+ let mut data = vec![0.0; total_elements];
+ let mut written = 0;
+ while written < total_elements {
+ let n = self.read_chunk(&mut data[written..])?;
+ if n == 0 {
+ break;
+ }
+ written += n;
+ }
+ if written != total_elements {
+ data.truncate(written);
+ }
+
+ Ok((data, self.header.num_samples, self.header.sample_size))
+ }
+
+ fn get_sample_size(&self) -> Option<usize> {
+ Some(self.header.sample_size)
+ }
+
+ fn get_num_samples(&self) -> Option<usize> {
+ Some(self.header.num_samples)
+ }
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+ fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ if self.row_cursor >= self.header.num_samples {
+ return Ok(0);
+ }
+
+ let sample_size = self.header.sample_size;
+ let max_rows = buffer.len() / sample_size;
+ if max_rows == 0 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Buffer too small for one sample (need {} elements)",
+ sample_size
+ )));
+ }
+
+ let remaining_rows = self.header.num_samples - self.row_cursor;
+ let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+ let elem_count = rows_to_read * sample_size;
+
+ if !self.header.fortran_order {
+ let offset = self.header.data_offset
+ + (self.row_cursor * sample_size * std::mem::size_of::<f64>())
as u64;
+ read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+ } else {
+ if self.column_buf.len() < rows_to_read {
+ self.column_buf.resize(rows_to_read, 0.0);
+ }
+ for col in 0..sample_size {
+ let offset = self.header.data_offset
+ + ((col * self.header.num_samples + self.row_cursor)
+ * std::mem::size_of::<f64>()) as u64;
+ let column = &mut self.column_buf[..rows_to_read];
+ read_f64s_at(&mut self.file, offset, column)?;
+ for row in 0..rows_to_read {
+ buffer[row * sample_size + col] = column[row];
+ }
+ }
Review Comment:
The Fortran-order reading path performs one seek and read per column (line
536), resulting in sample_size seek operations. For wide matrices with many
columns, this could cause significant performance degradation compared to
C-order reading. Consider documenting this performance characteristic in the
struct's documentation, or implementing a batched column read strategy to
reduce the number of seeks.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -18,13 +18,286 @@
//!
//! Provides support for reading .npy files containing 2D float64 arrays.
-use std::path::Path;
+use std::fs::File;
+use std::io::{Read, Seek, SeekFrom};
+use std::path::{Path, PathBuf};
+use memmap2::Mmap;
use ndarray::Array2;
use ndarray_npy::ReadNpyError;
use crate::error::{MahoutError, Result};
-use crate::reader::DataReader;
+use crate::reader::{DataReader, StreamingDataReader};
+
+const NPY_MAGIC: &[u8; 6] = b"\x93NUMPY";
+
+#[derive(Clone, Debug)]
+struct NpyHeader {
+ fortran_order: bool,
+ num_samples: usize,
+ sample_size: usize,
+ data_offset: u64,
+}
+
+impl NpyHeader {
+ fn total_elements(&self) -> usize {
+ self.num_samples * self.sample_size
+ }
+}
+
+fn parse_header_value<'a>(header: &'a str, key: &str) -> Result<&'a str> {
+ let key_single = format!("'{}'", key);
+ let mut start = header.find(&key_single);
+ if start.is_none() {
+ let key_double = format!("\"{}\"", key);
+ start = header.find(&key_double);
+ }
+ let start = start.ok_or_else(|| {
+ MahoutError::InvalidInput(format!("Missing '{}' entry in .npy header",
key))
+ })?;
+ let rest = &header[start..];
+ let colon = rest
+ .find(':')
+ .ok_or_else(|| MahoutError::InvalidInput("Malformed .npy
header".to_string()))?;
+ Ok(rest[colon + 1..].trim_start())
+}
+
+fn parse_quoted_value(header: &str, key: &str) -> Result<String> {
+ let rest = parse_header_value(header, key)?;
+ let mut chars = rest.chars();
+ let quote = chars
+ .next()
+ .ok_or_else(|| MahoutError::InvalidInput("Malformed .npy
header".to_string()))?;
+ if quote != '\'' && quote != '"' {
+ return Err(MahoutError::InvalidInput(format!(
+ "Expected quoted value for '{}'",
+ key
+ )));
+ }
+ let rest = &rest[1..];
+ let end = rest
+ .find(quote)
+ .ok_or_else(|| MahoutError::InvalidInput(format!("Unterminated string
for '{}'", key)))?;
+ Ok(rest[..end].to_string())
+}
+
+fn parse_bool_value(header: &str, key: &str) -> Result<bool> {
+ let rest = parse_header_value(header, key)?;
+ if rest.starts_with("True") {
+ Ok(true)
+ } else if rest.starts_with("False") {
+ Ok(false)
+ } else {
+ Err(MahoutError::InvalidInput(format!(
+ "Expected True/False for '{}'",
+ key
+ )))
+ }
+}
+
+fn parse_shape_value(header: &str, key: &str) -> Result<Vec<usize>> {
+ let rest = parse_header_value(header, key)?;
+ let rest = rest.trim_start();
+ if !rest.starts_with('(') {
+ return Err(MahoutError::InvalidInput(
+ "Malformed shape in .npy header".to_string(),
+ ));
+ }
+ let end = rest
+ .find(')')
+ .ok_or_else(|| MahoutError::InvalidInput("Malformed shape in .npy
header".to_string()))?;
+ let inner = &rest[1..end];
+ let mut dims = Vec::new();
+ for part in inner.split(',') {
+ let part = part.trim();
+ if part.is_empty() {
+ continue;
+ }
+ let value = part.parse::<usize>().map_err(|e| {
+ MahoutError::InvalidInput(format!("Invalid shape value '{}': {}",
part, e))
+ })?;
+ dims.push(value);
+ }
+ if dims.is_empty() {
+ return Err(MahoutError::InvalidInput(
+ "Empty shape in .npy header".to_string(),
+ ));
+ }
+ Ok(dims)
+}
+
+fn validate_descr(descr: &str) -> Result<()> {
+ let (endian, typecode) = match descr.chars().next() {
+ Some('<') | Some('>') | Some('|') | Some('=') => {
+ (Some(descr.chars().next().unwrap()), &descr[1..])
+ }
+ _ => (None, descr),
+ };
+
+ if typecode != "f8" {
+ return Err(MahoutError::InvalidInput(format!(
+ "Unsupported dtype '{}' in .npy file (expected f8)",
+ descr
+ )));
+ }
+
+ if let Some('>') = endian {
+ return Err(MahoutError::InvalidInput(
+ "Big-endian .npy files are not supported".to_string(),
+ ));
+ }
+
+ if !cfg!(target_endian = "little") {
+ return Err(MahoutError::InvalidInput(
+ "NumPy .npy reader only supports little-endian hosts".to_string(),
+ ));
+ }
+
+ Ok(())
+}
+
+fn read_npy_header(path: &Path, file: &mut File) -> Result<NpyHeader> {
+ let mut magic = [0u8; 6];
+ file.read_exact(&mut magic)
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy header:
{}", e)))?;
+ if &magic != NPY_MAGIC {
+ return Err(MahoutError::InvalidInput(
+ "Invalid .npy file magic header".to_string(),
+ ));
+ }
+
+ let mut version = [0u8; 2];
+ file.read_exact(&mut version)
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy header:
{}", e)))?;
+ let major = version[0];
+ let minor = version[1];
+
+ let header_len = match major {
+ 1 => {
+ let mut len_bytes = [0u8; 2];
+ file.read_exact(&mut len_bytes)
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy
header: {}", e)))?;
+ u16::from_le_bytes(len_bytes) as usize
+ }
+ 2 | 3 => {
+ let mut len_bytes = [0u8; 4];
+ file.read_exact(&mut len_bytes)
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy
header: {}", e)))?;
+ u32::from_le_bytes(len_bytes) as usize
+ }
+ _ => {
+ return Err(MahoutError::InvalidInput(format!(
+ "Unsupported .npy version {}.{}",
+ major, minor
+ )));
+ }
+ };
+
+ let mut header_bytes = vec![0u8; header_len];
+ file.read_exact(&mut header_bytes)
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy header:
{}", e)))?;
+ let header_str = std::str::from_utf8(&header_bytes)
+ .map_err(|e| MahoutError::InvalidInput(format!("Invalid .npy header
encoding: {}", e)))?;
+
+ let descr = parse_quoted_value(header_str, "descr")?;
+ validate_descr(&descr)?;
+ let fortran_order = parse_bool_value(header_str, "fortran_order")?;
+ let shape = parse_shape_value(header_str, "shape")?;
+ if shape.len() != 2 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Expected 2D array, got {}D array with shape {:?}",
+ shape.len(),
+ shape
+ )));
+ }
+
+ let num_samples = shape[0];
+ let sample_size = shape[1];
+ if num_samples == 0 || sample_size == 0 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Invalid array shape: [{}, {}]. Both dimensions must be > 0",
+ num_samples, sample_size
+ )));
+ }
+
+ let total_elements = num_samples
+ .checked_mul(sample_size)
+ .ok_or_else(|| MahoutError::InvalidInput("Array size
overflow".to_string()))?;
+ let data_len_bytes = total_elements
+ .checked_mul(std::mem::size_of::<f64>())
+ .ok_or_else(|| MahoutError::InvalidInput("Array size
overflow".to_string()))?;
+
+ let data_offset = file
+ .stream_position()
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy header:
{}", e)))?;
+ let file_len = file
+ .metadata()
+ .map_err(|e| MahoutError::Io(format!("Failed to stat NumPy file: {}",
e)))?
+ .len();
+ if data_offset + data_len_bytes as u64 > file_len {
+ return Err(MahoutError::InvalidInput(format!(
+ "NumPy file {} is truncated (expected {} bytes of data)",
+ path.display(),
+ data_len_bytes
+ )));
+ }
+
+ Ok(NpyHeader {
+ fortran_order,
+ num_samples,
+ sample_size,
+ data_offset,
+ })
+}
+
+fn read_f64s_at(file: &mut File, offset: u64, out: &mut [f64]) -> Result<()> {
+ if out.is_empty() {
+ return Ok(());
+ }
+ if !cfg!(target_endian = "little") {
+ return Err(MahoutError::InvalidInput(
+ "NumPy .npy reader only supports little-endian hosts".to_string(),
+ ));
+ }
+ let byte_len = std::mem::size_of_val(out);
Review Comment:
The unsafe pointer cast creates a mutable byte slice from a mutable f64
slice. While this is valid for transmuting between types of the same size,
there's no documentation explaining why this is safe. Consider adding a safety
comment explaining that f64 and [u8; 8] have compatible representations on
little-endian systems, and that the endianness check on line 257 ensures this
precondition.
```suggestion
let byte_len = std::mem::size_of_val(out);
// SAFETY:
// - `out.as_mut_ptr()` points to `out.len()` consecutive `f64` values.
// - `byte_len` is computed with `size_of_val(out)`, so the resulting
// `u8` slice covers exactly the same memory region as `out` and no
more.
// - `u8` has alignment 1, which is weaker than `f64`'s alignment, so
// reinterpreting the buffer as `*mut u8` does not violate alignment.
// - We only reinterpret the existing `f64` storage as raw bytes; we do
// not change its length or layout, and we rely on the little-endian
// host check above to match NumPy's on-disk representation.
```
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -219,6 +743,103 @@ mod tests {
fs::remove_file(temp_path).unwrap();
}
+ #[test]
+ fn test_numpy_streaming_reader_basic() {
+ let temp_path = "/tmp/test_numpy_streaming_basic.npy";
+ let num_samples = 4;
+ let sample_size = 3;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ ndarray_npy::write_npy(temp_path, &array).unwrap();
+
+ let mut reader = NumpyStreamingReader::new(temp_path).unwrap();
+ let mut buffer = vec![0.0; sample_size * 2];
+ let mut out = Vec::new();
+ loop {
+ let written = reader.read_chunk(&mut buffer).unwrap();
+ if written == 0 {
+ break;
+ }
+ out.extend_from_slice(&buffer[..written]);
+ }
+
+ assert_eq!(out, data);
+ assert_eq!(reader.total_rows(), num_samples);
+
+ fs::remove_file(temp_path).unwrap();
+ }
+
+ #[test]
+ fn test_numpy_streaming_reader_fortran_order() {
+ let temp_path = "/tmp/test_numpy_streaming_fortran.npy";
Review Comment:
Tests use hardcoded /tmp paths which are not portable across operating
systems (e.g., Windows doesn't have /tmp). Consider using std::env::temp_dir()
or tempfile crate to generate platform-independent temporary file paths for
better cross-platform compatibility.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+ file: File,
+ header: NpyHeader,
+ row_cursor: usize,
+ column_buf: Vec<f64>,
+}
+
+impl NumpyStreamingReader {
+ /// Create a new streaming NumPy reader.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let path = path.as_ref();
+
+ match path.try_exists() {
+ Ok(false) => {
+ return Err(MahoutError::Io(format!(
+ "NumPy file not found: {}",
+ path.display()
+ )));
+ }
+ Err(e) => {
+ return Err(MahoutError::Io(format!(
+ "Failed to check if NumPy file exists at {}: {}",
+ path.display(),
+ e
+ )));
+ }
+ Ok(true) => {}
+ }
+
+ let mut file = File::open(path)
+ .map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+ let header = read_npy_header(path, &mut file)?;
+
+ Ok(Self {
+ file,
+ header,
+ row_cursor: 0,
+ column_buf: Vec::new(),
+ })
+ }
+}
+
+impl DataReader for NumpyStreamingReader {
+ fn read_batch(&mut self) -> Result<(Vec<f64>, usize, usize)> {
+ let total_elements = self.header.total_elements();
+ let mut data = vec![0.0; total_elements];
+ let mut written = 0;
+ while written < total_elements {
+ let n = self.read_chunk(&mut data[written..])?;
+ if n == 0 {
+ break;
+ }
+ written += n;
+ }
+ if written != total_elements {
+ data.truncate(written);
+ }
+
+ Ok((data, self.header.num_samples, self.header.sample_size))
+ }
+
+ fn get_sample_size(&self) -> Option<usize> {
+ Some(self.header.sample_size)
+ }
+
+ fn get_num_samples(&self) -> Option<usize> {
+ Some(self.header.num_samples)
+ }
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+ fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ if self.row_cursor >= self.header.num_samples {
+ return Ok(0);
+ }
+
+ let sample_size = self.header.sample_size;
+ let max_rows = buffer.len() / sample_size;
+ if max_rows == 0 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Buffer too small for one sample (need {} elements)",
+ sample_size
+ )));
+ }
+
+ let remaining_rows = self.header.num_samples - self.row_cursor;
+ let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+ let elem_count = rows_to_read * sample_size;
+
+ if !self.header.fortran_order {
+ let offset = self.header.data_offset
+ + (self.row_cursor * sample_size * std::mem::size_of::<f64>())
as u64;
Review Comment:
The offset calculation could potentially overflow when converting to u64.
The expression `(self.row_cursor * sample_size * std::mem::size_of::<f64>()) as
u64` performs multiplication before the cast, which could overflow on large
datasets. Consider using checked arithmetic or performing the cast earlier in
the calculation chain to prevent potential overflow.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -219,6 +743,103 @@ mod tests {
fs::remove_file(temp_path).unwrap();
}
+ #[test]
+ fn test_numpy_streaming_reader_basic() {
+ let temp_path = "/tmp/test_numpy_streaming_basic.npy";
+ let num_samples = 4;
+ let sample_size = 3;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ ndarray_npy::write_npy(temp_path, &array).unwrap();
+
+ let mut reader = NumpyStreamingReader::new(temp_path).unwrap();
+ let mut buffer = vec![0.0; sample_size * 2];
+ let mut out = Vec::new();
+ loop {
+ let written = reader.read_chunk(&mut buffer).unwrap();
+ if written == 0 {
+ break;
+ }
+ out.extend_from_slice(&buffer[..written]);
+ }
+
+ assert_eq!(out, data);
+ assert_eq!(reader.total_rows(), num_samples);
+
+ fs::remove_file(temp_path).unwrap();
+ }
+
+ #[test]
+ fn test_numpy_streaming_reader_fortran_order() {
+ let temp_path = "/tmp/test_numpy_streaming_fortran.npy";
+ let num_samples = 3;
+ let sample_size = 4;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ let array_f = array.reversed_axes();
+ let array_f = array_f.as_standard_layout().reversed_axes();
+ ndarray_npy::write_npy(temp_path, &array_f).unwrap();
+
+ let mut reader = NumpyStreamingReader::new(temp_path).unwrap();
+ let mut buffer = vec![0.0; sample_size];
+ let mut out = Vec::new();
+ loop {
+ let written = reader.read_chunk(&mut buffer).unwrap();
+ if written == 0 {
+ break;
+ }
+ out.extend_from_slice(&buffer[..written]);
+ }
+
+ assert_eq!(out, data);
+
+ fs::remove_file(temp_path).unwrap();
+ }
+
+ #[test]
+ fn test_numpy_mmap_reader_basic() {
+ let temp_path = "/tmp/test_numpy_mmap_basic.npy";
+ let num_samples = 5;
+ let sample_size = 2;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ ndarray_npy::write_npy(temp_path, &array).unwrap();
+
+ let mut reader = NumpyMmapReader::new(temp_path).unwrap();
+ let (read_data, read_samples, read_size) =
reader.read_batch().unwrap();
+
+ assert_eq!(read_samples, num_samples);
+ assert_eq!(read_size, sample_size);
+ assert_eq!(read_data, data);
+
+ fs::remove_file(temp_path).unwrap();
+ }
+
+ #[test]
+ fn test_numpy_mmap_reader_fortran_order() {
+ let temp_path = "/tmp/test_numpy_mmap_fortran.npy";
Review Comment:
Tests use hardcoded /tmp paths which are not portable across operating
systems (e.g., Windows doesn't have /tmp). Consider using std::env::temp_dir()
or tempfile crate to generate platform-independent temporary file paths for
better cross-platform compatibility.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+ file: File,
+ header: NpyHeader,
+ row_cursor: usize,
+ column_buf: Vec<f64>,
+}
+
+impl NumpyStreamingReader {
+ /// Create a new streaming NumPy reader.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let path = path.as_ref();
+
+ match path.try_exists() {
+ Ok(false) => {
+ return Err(MahoutError::Io(format!(
+ "NumPy file not found: {}",
+ path.display()
+ )));
+ }
+ Err(e) => {
+ return Err(MahoutError::Io(format!(
+ "Failed to check if NumPy file exists at {}: {}",
+ path.display(),
+ e
+ )));
+ }
+ Ok(true) => {}
+ }
+
+ let mut file = File::open(path)
+ .map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+ let header = read_npy_header(path, &mut file)?;
+
+ Ok(Self {
+ file,
+ header,
+ row_cursor: 0,
+ column_buf: Vec::new(),
+ })
+ }
+}
+
+impl DataReader for NumpyStreamingReader {
+ fn read_batch(&mut self) -> Result<(Vec<f64>, usize, usize)> {
+ let total_elements = self.header.total_elements();
+ let mut data = vec![0.0; total_elements];
+ let mut written = 0;
+ while written < total_elements {
+ let n = self.read_chunk(&mut data[written..])?;
+ if n == 0 {
+ break;
+ }
+ written += n;
+ }
+ if written != total_elements {
+ data.truncate(written);
+ }
+
+ Ok((data, self.header.num_samples, self.header.sample_size))
+ }
+
+ fn get_sample_size(&self) -> Option<usize> {
+ Some(self.header.sample_size)
+ }
+
+ fn get_num_samples(&self) -> Option<usize> {
+ Some(self.header.num_samples)
+ }
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+ fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ if self.row_cursor >= self.header.num_samples {
+ return Ok(0);
+ }
+
+ let sample_size = self.header.sample_size;
+ let max_rows = buffer.len() / sample_size;
+ if max_rows == 0 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Buffer too small for one sample (need {} elements)",
+ sample_size
+ )));
+ }
+
+ let remaining_rows = self.header.num_samples - self.row_cursor;
+ let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+ let elem_count = rows_to_read * sample_size;
+
+ if !self.header.fortran_order {
+ let offset = self.header.data_offset
+ + (self.row_cursor * sample_size * std::mem::size_of::<f64>())
as u64;
+ read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+ } else {
+ if self.column_buf.len() < rows_to_read {
+ self.column_buf.resize(rows_to_read, 0.0);
+ }
+ for col in 0..sample_size {
+ let offset = self.header.data_offset
+ + ((col * self.header.num_samples + self.row_cursor)
+ * std::mem::size_of::<f64>()) as u64;
+ let column = &mut self.column_buf[..rows_to_read];
+ read_f64s_at(&mut self.file, offset, column)?;
+ for row in 0..rows_to_read {
+ buffer[row * sample_size + col] = column[row];
+ }
+ }
+ }
+
+ self.row_cursor += rows_to_read;
+ Ok(elem_count)
+ }
+
+ fn total_rows(&self) -> usize {
+ self.header.num_samples
+ }
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read +
flatten pass.
+pub struct NumpyMmapReader {
+ mmap: Mmap,
+ header: NpyHeader,
+ row_cursor: usize,
+ column_buf: Vec<f64>,
+}
+
+impl NumpyMmapReader {
+ /// Create a new memory-mapped NumPy reader.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let path = path.as_ref();
+
+ match path.try_exists() {
+ Ok(false) => {
+ return Err(MahoutError::Io(format!(
+ "NumPy file not found: {}",
+ path.display()
+ )));
+ }
+ Err(e) => {
+ return Err(MahoutError::Io(format!(
+ "Failed to check if NumPy file exists at {}: {}",
+ path.display(),
+ e
+ )));
+ }
+ Ok(true) => {}
+ }
+
+ let mut file = File::open(path)
+ .map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+ let header = read_npy_header(path, &mut file)?;
+ let mmap = unsafe {
+ Mmap::map(&file)
+ .map_err(|e| MahoutError::Io(format!("Failed to mmap NumPy
file: {}", e)))?
+ };
Review Comment:
The unsafe mmap call requires additional safety documentation. Memory-mapped
files can have safety issues if the underlying file is modified by another
process or if the file is truncated. Consider adding a safety comment
explaining the assumptions (e.g., file won't be modified during read) or
document this in the struct's documentation.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -219,6 +743,103 @@ mod tests {
fs::remove_file(temp_path).unwrap();
}
+ #[test]
+ fn test_numpy_streaming_reader_basic() {
+ let temp_path = "/tmp/test_numpy_streaming_basic.npy";
+ let num_samples = 4;
+ let sample_size = 3;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ ndarray_npy::write_npy(temp_path, &array).unwrap();
+
+ let mut reader = NumpyStreamingReader::new(temp_path).unwrap();
+ let mut buffer = vec![0.0; sample_size * 2];
+ let mut out = Vec::new();
+ loop {
+ let written = reader.read_chunk(&mut buffer).unwrap();
+ if written == 0 {
+ break;
+ }
+ out.extend_from_slice(&buffer[..written]);
+ }
+
+ assert_eq!(out, data);
+ assert_eq!(reader.total_rows(), num_samples);
+
+ fs::remove_file(temp_path).unwrap();
Review Comment:
Test cleanup with fs::remove_file().unwrap() can panic if the file removal
fails (e.g., file already deleted, permission issues). This could cause the
test to panic even if the actual test logic passed. Consider using a result
pattern or ignore errors during cleanup to make tests more robust.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+ file: File,
+ header: NpyHeader,
+ row_cursor: usize,
+ column_buf: Vec<f64>,
+}
+
+impl NumpyStreamingReader {
+ /// Create a new streaming NumPy reader.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let path = path.as_ref();
+
+ match path.try_exists() {
+ Ok(false) => {
+ return Err(MahoutError::Io(format!(
+ "NumPy file not found: {}",
+ path.display()
+ )));
+ }
+ Err(e) => {
+ return Err(MahoutError::Io(format!(
+ "Failed to check if NumPy file exists at {}: {}",
+ path.display(),
+ e
+ )));
+ }
+ Ok(true) => {}
+ }
+
+ let mut file = File::open(path)
+ .map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+ let header = read_npy_header(path, &mut file)?;
+
+ Ok(Self {
+ file,
+ header,
+ row_cursor: 0,
+ column_buf: Vec::new(),
+ })
+ }
+}
+
+impl DataReader for NumpyStreamingReader {
+ fn read_batch(&mut self) -> Result<(Vec<f64>, usize, usize)> {
+ let total_elements = self.header.total_elements();
+ let mut data = vec![0.0; total_elements];
+ let mut written = 0;
+ while written < total_elements {
+ let n = self.read_chunk(&mut data[written..])?;
+ if n == 0 {
+ break;
+ }
+ written += n;
+ }
+ if written != total_elements {
+ data.truncate(written);
+ }
+
+ Ok((data, self.header.num_samples, self.header.sample_size))
+ }
+
+ fn get_sample_size(&self) -> Option<usize> {
+ Some(self.header.sample_size)
+ }
+
+ fn get_num_samples(&self) -> Option<usize> {
+ Some(self.header.num_samples)
+ }
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+ fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ if self.row_cursor >= self.header.num_samples {
+ return Ok(0);
+ }
+
+ let sample_size = self.header.sample_size;
+ let max_rows = buffer.len() / sample_size;
+ if max_rows == 0 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Buffer too small for one sample (need {} elements)",
+ sample_size
+ )));
+ }
+
+ let remaining_rows = self.header.num_samples - self.row_cursor;
+ let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+ let elem_count = rows_to_read * sample_size;
+
+ if !self.header.fortran_order {
+ let offset = self.header.data_offset
+ + (self.row_cursor * sample_size * std::mem::size_of::<f64>())
as u64;
+ read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+ } else {
+ if self.column_buf.len() < rows_to_read {
+ self.column_buf.resize(rows_to_read, 0.0);
+ }
+ for col in 0..sample_size {
+ let offset = self.header.data_offset
+ + ((col * self.header.num_samples + self.row_cursor)
+ * std::mem::size_of::<f64>()) as u64;
+ let column = &mut self.column_buf[..rows_to_read];
+ read_f64s_at(&mut self.file, offset, column)?;
+ for row in 0..rows_to_read {
+ buffer[row * sample_size + col] = column[row];
+ }
+ }
+ }
+
+ self.row_cursor += rows_to_read;
+ Ok(elem_count)
+ }
+
+ fn total_rows(&self) -> usize {
+ self.header.num_samples
+ }
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read +
flatten pass.
+pub struct NumpyMmapReader {
+ mmap: Mmap,
+ header: NpyHeader,
+ row_cursor: usize,
+ column_buf: Vec<f64>,
+}
+
+impl NumpyMmapReader {
+ /// Create a new memory-mapped NumPy reader.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let path = path.as_ref();
+
+ match path.try_exists() {
+ Ok(false) => {
+ return Err(MahoutError::Io(format!(
+ "NumPy file not found: {}",
+ path.display()
+ )));
+ }
+ Err(e) => {
+ return Err(MahoutError::Io(format!(
+ "Failed to check if NumPy file exists at {}: {}",
+ path.display(),
+ e
+ )));
+ }
+ Ok(true) => {}
+ }
+
+ let mut file = File::open(path)
+ .map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+ let header = read_npy_header(path, &mut file)?;
+ let mmap = unsafe {
+ Mmap::map(&file)
+ .map_err(|e| MahoutError::Io(format!("Failed to mmap NumPy
file: {}", e)))?
+ };
+
+ Ok(Self {
+ mmap,
+ header,
+ row_cursor: 0,
+ column_buf: Vec::new(),
+ })
+ }
+}
+
+impl DataReader for NumpyMmapReader {
+ fn read_batch(&mut self) -> Result<(Vec<f64>, usize, usize)> {
+ let total_elements = self.header.total_elements();
+ let mut data = vec![0.0; total_elements];
+ let mut written = 0;
+ while written < total_elements {
+ let n = self.read_chunk(&mut data[written..])?;
+ if n == 0 {
+ break;
+ }
+ written += n;
+ }
+ if written != total_elements {
+ data.truncate(written);
+ }
+
+ Ok((data, self.header.num_samples, self.header.sample_size))
+ }
+
+ fn get_sample_size(&self) -> Option<usize> {
+ Some(self.header.sample_size)
+ }
+
+ fn get_num_samples(&self) -> Option<usize> {
+ Some(self.header.num_samples)
+ }
+}
+
+impl StreamingDataReader for NumpyMmapReader {
+ fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ if self.row_cursor >= self.header.num_samples {
+ return Ok(0);
+ }
+
+ let sample_size = self.header.sample_size;
+ let max_rows = buffer.len() / sample_size;
+ if max_rows == 0 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Buffer too small for one sample (need {} elements)",
+ sample_size
+ )));
+ }
+
+ let remaining_rows = self.header.num_samples - self.row_cursor;
+ let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+ let elem_count = rows_to_read * sample_size;
+ let data_base = self.header.data_offset as usize;
+
+ if !self.header.fortran_order {
+ let start = data_base + self.row_cursor * sample_size *
std::mem::size_of::<f64>();
+ let end = start + elem_count * std::mem::size_of::<f64>();
+ let bytes = &self.mmap[start..end];
+ copy_f64s_from_bytes(bytes, &mut buffer[..elem_count])?;
+ } else {
+ if self.column_buf.len() < rows_to_read {
+ self.column_buf.resize(rows_to_read, 0.0);
+ }
+ for col in 0..sample_size {
+ let start = data_base
+ + (col * self.header.num_samples + self.row_cursor)
+ * std::mem::size_of::<f64>();
+ let end = start + rows_to_read * std::mem::size_of::<f64>();
Review Comment:
Similar overflow potential exists in this calculation: `(col *
self.header.num_samples + self.row_cursor) * std::mem::size_of::<f64>()`. The
nested multiplication could overflow on large datasets. Consider using checked
arithmetic to prevent potential overflow and provide clear error messages.
```suggestion
let elem_size = std::mem::size_of::<f64>();
for col in 0..sample_size {
let elem_offset = col
.checked_mul(self.header.num_samples)
.and_then(|v| v.checked_add(self.row_cursor))
.ok_or_else(|| {
MahoutError::InvalidInput(
"Overflow in element offset calculation for
NumPy mmap reader"
.to_string(),
)
})?;
let byte_offset = elem_offset
.checked_mul(elem_size)
.ok_or_else(|| {
MahoutError::InvalidInput(
"Overflow in byte offset calculation for NumPy
mmap reader"
.to_string(),
)
})?;
let start = data_base
.checked_add(byte_offset)
.ok_or_else(|| {
MahoutError::InvalidInput(
"Overflow when adding base offset for NumPy mmap
reader"
.to_string(),
)
})?;
let byte_len = rows_to_read
.checked_mul(elem_size)
.ok_or_else(|| {
MahoutError::InvalidInput(
"Overflow in chunk length calculation for NumPy
mmap reader"
.to_string(),
)
})?;
let end = start
.checked_add(byte_len)
.ok_or_else(|| {
MahoutError::InvalidInput(
"Overflow when computing end offset for NumPy
mmap reader"
.to_string(),
)
})?;
```
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -219,6 +743,103 @@ mod tests {
fs::remove_file(temp_path).unwrap();
}
+ #[test]
+ fn test_numpy_streaming_reader_basic() {
+ let temp_path = "/tmp/test_numpy_streaming_basic.npy";
+ let num_samples = 4;
+ let sample_size = 3;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ ndarray_npy::write_npy(temp_path, &array).unwrap();
+
+ let mut reader = NumpyStreamingReader::new(temp_path).unwrap();
+ let mut buffer = vec![0.0; sample_size * 2];
+ let mut out = Vec::new();
+ loop {
+ let written = reader.read_chunk(&mut buffer).unwrap();
+ if written == 0 {
+ break;
+ }
+ out.extend_from_slice(&buffer[..written]);
+ }
+
+ assert_eq!(out, data);
+ assert_eq!(reader.total_rows(), num_samples);
+
+ fs::remove_file(temp_path).unwrap();
+ }
+
+ #[test]
+ fn test_numpy_streaming_reader_fortran_order() {
+ let temp_path = "/tmp/test_numpy_streaming_fortran.npy";
+ let num_samples = 3;
+ let sample_size = 4;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ let array_f = array.reversed_axes();
+ let array_f = array_f.as_standard_layout().reversed_axes();
+ ndarray_npy::write_npy(temp_path, &array_f).unwrap();
+
+ let mut reader = NumpyStreamingReader::new(temp_path).unwrap();
+ let mut buffer = vec![0.0; sample_size];
+ let mut out = Vec::new();
+ loop {
+ let written = reader.read_chunk(&mut buffer).unwrap();
+ if written == 0 {
+ break;
+ }
+ out.extend_from_slice(&buffer[..written]);
+ }
+
+ assert_eq!(out, data);
+
+ fs::remove_file(temp_path).unwrap();
+ }
+
+ #[test]
+ fn test_numpy_mmap_reader_basic() {
+ let temp_path = "/tmp/test_numpy_mmap_basic.npy";
Review Comment:
Tests use hardcoded /tmp paths which are not portable across operating
systems (e.g., Windows doesn't have /tmp). Consider using std::env::temp_dir()
or tempfile crate to generate platform-independent temporary file paths for
better cross-platform compatibility.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -18,13 +18,286 @@
//!
//! Provides support for reading .npy files containing 2D float64 arrays.
-use std::path::Path;
+use std::fs::File;
+use std::io::{Read, Seek, SeekFrom};
+use std::path::{Path, PathBuf};
+use memmap2::Mmap;
use ndarray::Array2;
use ndarray_npy::ReadNpyError;
use crate::error::{MahoutError, Result};
-use crate::reader::DataReader;
+use crate::reader::{DataReader, StreamingDataReader};
+
+const NPY_MAGIC: &[u8; 6] = b"\x93NUMPY";
+
+#[derive(Clone, Debug)]
+struct NpyHeader {
+ fortran_order: bool,
+ num_samples: usize,
+ sample_size: usize,
+ data_offset: u64,
+}
+
+impl NpyHeader {
+ fn total_elements(&self) -> usize {
+ self.num_samples * self.sample_size
+ }
+}
+
+fn parse_header_value<'a>(header: &'a str, key: &str) -> Result<&'a str> {
+ let key_single = format!("'{}'", key);
+ let mut start = header.find(&key_single);
+ if start.is_none() {
+ let key_double = format!("\"{}\"", key);
+ start = header.find(&key_double);
+ }
+ let start = start.ok_or_else(|| {
+ MahoutError::InvalidInput(format!("Missing '{}' entry in .npy header",
key))
+ })?;
+ let rest = &header[start..];
+ let colon = rest
+ .find(':')
+ .ok_or_else(|| MahoutError::InvalidInput("Malformed .npy
header".to_string()))?;
+ Ok(rest[colon + 1..].trim_start())
+}
+
+fn parse_quoted_value(header: &str, key: &str) -> Result<String> {
+ let rest = parse_header_value(header, key)?;
+ let mut chars = rest.chars();
+ let quote = chars
+ .next()
+ .ok_or_else(|| MahoutError::InvalidInput("Malformed .npy
header".to_string()))?;
+ if quote != '\'' && quote != '"' {
+ return Err(MahoutError::InvalidInput(format!(
+ "Expected quoted value for '{}'",
+ key
+ )));
+ }
+ let rest = &rest[1..];
+ let end = rest
+ .find(quote)
+ .ok_or_else(|| MahoutError::InvalidInput(format!("Unterminated string
for '{}'", key)))?;
+ Ok(rest[..end].to_string())
+}
+
+fn parse_bool_value(header: &str, key: &str) -> Result<bool> {
+ let rest = parse_header_value(header, key)?;
+ if rest.starts_with("True") {
+ Ok(true)
+ } else if rest.starts_with("False") {
+ Ok(false)
+ } else {
+ Err(MahoutError::InvalidInput(format!(
+ "Expected True/False for '{}'",
+ key
+ )))
+ }
+}
+
+fn parse_shape_value(header: &str, key: &str) -> Result<Vec<usize>> {
+ let rest = parse_header_value(header, key)?;
+ let rest = rest.trim_start();
+ if !rest.starts_with('(') {
+ return Err(MahoutError::InvalidInput(
+ "Malformed shape in .npy header".to_string(),
+ ));
+ }
+ let end = rest
+ .find(')')
+ .ok_or_else(|| MahoutError::InvalidInput("Malformed shape in .npy
header".to_string()))?;
+ let inner = &rest[1..end];
+ let mut dims = Vec::new();
+ for part in inner.split(',') {
+ let part = part.trim();
+ if part.is_empty() {
+ continue;
+ }
+ let value = part.parse::<usize>().map_err(|e| {
+ MahoutError::InvalidInput(format!("Invalid shape value '{}': {}",
part, e))
+ })?;
+ dims.push(value);
+ }
+ if dims.is_empty() {
+ return Err(MahoutError::InvalidInput(
+ "Empty shape in .npy header".to_string(),
+ ));
+ }
+ Ok(dims)
+}
+
+fn validate_descr(descr: &str) -> Result<()> {
+ let (endian, typecode) = match descr.chars().next() {
+ Some('<') | Some('>') | Some('|') | Some('=') => {
+ (Some(descr.chars().next().unwrap()), &descr[1..])
+ }
+ _ => (None, descr),
+ };
+
+ if typecode != "f8" {
+ return Err(MahoutError::InvalidInput(format!(
+ "Unsupported dtype '{}' in .npy file (expected f8)",
+ descr
+ )));
+ }
+
+ if let Some('>') = endian {
+ return Err(MahoutError::InvalidInput(
+ "Big-endian .npy files are not supported".to_string(),
+ ));
+ }
+
+ if !cfg!(target_endian = "little") {
+ return Err(MahoutError::InvalidInput(
+ "NumPy .npy reader only supports little-endian hosts".to_string(),
+ ));
+ }
+
+ Ok(())
+}
+
+fn read_npy_header(path: &Path, file: &mut File) -> Result<NpyHeader> {
+ let mut magic = [0u8; 6];
+ file.read_exact(&mut magic)
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy header:
{}", e)))?;
+ if &magic != NPY_MAGIC {
+ return Err(MahoutError::InvalidInput(
+ "Invalid .npy file magic header".to_string(),
+ ));
+ }
+
+ let mut version = [0u8; 2];
+ file.read_exact(&mut version)
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy header:
{}", e)))?;
+ let major = version[0];
+ let minor = version[1];
+
+ let header_len = match major {
+ 1 => {
+ let mut len_bytes = [0u8; 2];
+ file.read_exact(&mut len_bytes)
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy
header: {}", e)))?;
+ u16::from_le_bytes(len_bytes) as usize
+ }
+ 2 | 3 => {
+ let mut len_bytes = [0u8; 4];
+ file.read_exact(&mut len_bytes)
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy
header: {}", e)))?;
+ u32::from_le_bytes(len_bytes) as usize
+ }
+ _ => {
+ return Err(MahoutError::InvalidInput(format!(
+ "Unsupported .npy version {}.{}",
+ major, minor
+ )));
+ }
+ };
+
+ let mut header_bytes = vec![0u8; header_len];
+ file.read_exact(&mut header_bytes)
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy header:
{}", e)))?;
+ let header_str = std::str::from_utf8(&header_bytes)
+ .map_err(|e| MahoutError::InvalidInput(format!("Invalid .npy header
encoding: {}", e)))?;
+
+ let descr = parse_quoted_value(header_str, "descr")?;
+ validate_descr(&descr)?;
+ let fortran_order = parse_bool_value(header_str, "fortran_order")?;
+ let shape = parse_shape_value(header_str, "shape")?;
+ if shape.len() != 2 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Expected 2D array, got {}D array with shape {:?}",
+ shape.len(),
+ shape
+ )));
+ }
+
+ let num_samples = shape[0];
+ let sample_size = shape[1];
+ if num_samples == 0 || sample_size == 0 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Invalid array shape: [{}, {}]. Both dimensions must be > 0",
+ num_samples, sample_size
+ )));
+ }
+
+ let total_elements = num_samples
+ .checked_mul(sample_size)
+ .ok_or_else(|| MahoutError::InvalidInput("Array size
overflow".to_string()))?;
+ let data_len_bytes = total_elements
+ .checked_mul(std::mem::size_of::<f64>())
+ .ok_or_else(|| MahoutError::InvalidInput("Array size
overflow".to_string()))?;
+
+ let data_offset = file
+ .stream_position()
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy header:
{}", e)))?;
+ let file_len = file
+ .metadata()
+ .map_err(|e| MahoutError::Io(format!("Failed to stat NumPy file: {}",
e)))?
+ .len();
+ if data_offset + data_len_bytes as u64 > file_len {
+ return Err(MahoutError::InvalidInput(format!(
+ "NumPy file {} is truncated (expected {} bytes of data)",
+ path.display(),
+ data_len_bytes
+ )));
+ }
+
+ Ok(NpyHeader {
+ fortran_order,
+ num_samples,
+ sample_size,
+ data_offset,
+ })
+}
+
+fn read_f64s_at(file: &mut File, offset: u64, out: &mut [f64]) -> Result<()> {
+ if out.is_empty() {
+ return Ok(());
+ }
+ if !cfg!(target_endian = "little") {
+ return Err(MahoutError::InvalidInput(
+ "NumPy .npy reader only supports little-endian hosts".to_string(),
+ ));
+ }
+ let byte_len = std::mem::size_of_val(out);
+ let out_bytes =
+ unsafe { std::slice::from_raw_parts_mut(out.as_mut_ptr() as *mut u8,
byte_len) };
+ file.seek(SeekFrom::Start(offset))
+ .map_err(|e| MahoutError::Io(format!("Failed to seek NumPy file: {}",
e)))?;
+ file.read_exact(out_bytes)
+ .map_err(|e| MahoutError::Io(format!("Failed to read NumPy data: {}",
e)))?;
+ Ok(())
+}
+
+fn copy_f64s_from_bytes(bytes: &[u8], out: &mut [f64]) -> Result<()> {
+ if out.is_empty() {
+ return Ok(());
+ }
+ if bytes.len() != std::mem::size_of_val(out) {
+ return Err(MahoutError::InvalidInput(
+ "Byte slice length does not match output buffer".to_string(),
+ ));
+ }
+ if !cfg!(target_endian = "little") {
+ return Err(MahoutError::InvalidInput(
+ "NumPy .npy reader only supports little-endian hosts".to_string(),
+ ));
+ }
+
+ let align = std::mem::align_of::<f64>();
+ if (bytes.as_ptr() as usize).is_multiple_of(align) {
+ let src = unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const
f64, out.len()) };
+ out.copy_from_slice(src);
+ return Ok(());
Review Comment:
There's a potential issue with unsafe code alignment checking. The condition
`is_multiple_of` is used to check alignment, but this doesn't guarantee that
the pointer remains valid for the entire slice length. Additionally, casting
byte slices to f64 slices without proper validation could cause undefined
behavior if the bytes don't represent valid f64 values (though this is unlikely
in practice for binary data). Consider adding a safety comment explaining why
this is safe, or using a safer alternative like bytemuck crate with proper
validation.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+ file: File,
+ header: NpyHeader,
+ row_cursor: usize,
+ column_buf: Vec<f64>,
+}
+
+impl NumpyStreamingReader {
+ /// Create a new streaming NumPy reader.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let path = path.as_ref();
+
+ match path.try_exists() {
+ Ok(false) => {
+ return Err(MahoutError::Io(format!(
+ "NumPy file not found: {}",
+ path.display()
+ )));
+ }
+ Err(e) => {
+ return Err(MahoutError::Io(format!(
+ "Failed to check if NumPy file exists at {}: {}",
+ path.display(),
+ e
+ )));
+ }
+ Ok(true) => {}
+ }
+
+ let mut file = File::open(path)
+ .map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+ let header = read_npy_header(path, &mut file)?;
+
+ Ok(Self {
+ file,
+ header,
+ row_cursor: 0,
+ column_buf: Vec::new(),
+ })
+ }
+}
+
+impl DataReader for NumpyStreamingReader {
+ fn read_batch(&mut self) -> Result<(Vec<f64>, usize, usize)> {
+ let total_elements = self.header.total_elements();
+ let mut data = vec![0.0; total_elements];
+ let mut written = 0;
+ while written < total_elements {
+ let n = self.read_chunk(&mut data[written..])?;
+ if n == 0 {
+ break;
+ }
+ written += n;
+ }
+ if written != total_elements {
+ data.truncate(written);
+ }
+
+ Ok((data, self.header.num_samples, self.header.sample_size))
+ }
+
+ fn get_sample_size(&self) -> Option<usize> {
+ Some(self.header.sample_size)
+ }
+
+ fn get_num_samples(&self) -> Option<usize> {
+ Some(self.header.num_samples)
+ }
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+ fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ if self.row_cursor >= self.header.num_samples {
+ return Ok(0);
+ }
+
+ let sample_size = self.header.sample_size;
+ let max_rows = buffer.len() / sample_size;
+ if max_rows == 0 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Buffer too small for one sample (need {} elements)",
+ sample_size
+ )));
+ }
+
+ let remaining_rows = self.header.num_samples - self.row_cursor;
+ let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+ let elem_count = rows_to_read * sample_size;
+
+ if !self.header.fortran_order {
+ let offset = self.header.data_offset
+ + (self.row_cursor * sample_size * std::mem::size_of::<f64>())
as u64;
+ read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+ } else {
+ if self.column_buf.len() < rows_to_read {
+ self.column_buf.resize(rows_to_read, 0.0);
+ }
+ for col in 0..sample_size {
+ let offset = self.header.data_offset
+ + ((col * self.header.num_samples + self.row_cursor)
+ * std::mem::size_of::<f64>()) as u64;
+ let column = &mut self.column_buf[..rows_to_read];
+ read_f64s_at(&mut self.file, offset, column)?;
+ for row in 0..rows_to_read {
+ buffer[row * sample_size + col] = column[row];
+ }
+ }
+ }
+
+ self.row_cursor += rows_to_read;
+ Ok(elem_count)
+ }
+
+ fn total_rows(&self) -> usize {
+ self.header.num_samples
+ }
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read +
flatten pass.
+pub struct NumpyMmapReader {
+ mmap: Mmap,
+ header: NpyHeader,
+ row_cursor: usize,
+ column_buf: Vec<f64>,
+}
+
+impl NumpyMmapReader {
+ /// Create a new memory-mapped NumPy reader.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let path = path.as_ref();
+
+ match path.try_exists() {
+ Ok(false) => {
+ return Err(MahoutError::Io(format!(
+ "NumPy file not found: {}",
+ path.display()
+ )));
+ }
+ Err(e) => {
+ return Err(MahoutError::Io(format!(
+ "Failed to check if NumPy file exists at {}: {}",
+ path.display(),
+ e
+ )));
+ }
+ Ok(true) => {}
+ }
Review Comment:
Similar to the streaming reader's file existence check, this duplicates
error handling logic. The same validation pattern appears in lines 446-461 and
567-582. Consider extracting this into a helper function to reduce code
duplication and ensure consistent error messages.
```suggestion
fn ensure_numpy_file_exists(path: &Path) -> Result<()> {
match path.try_exists() {
Ok(false) => Err(MahoutError::Io(format!(
"NumPy file not found: {}",
path.display()
))),
Err(e) => Err(MahoutError::Io(format!(
"Failed to check if NumPy file exists at {}: {}",
path.display(),
e
))),
Ok(true) => Ok(()),
}
}
impl NumpyMmapReader {
/// Create a new memory-mapped NumPy reader.
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
ensure_numpy_file_exists(path)?;
```
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -219,6 +743,103 @@ mod tests {
fs::remove_file(temp_path).unwrap();
}
+ #[test]
+ fn test_numpy_streaming_reader_basic() {
+ let temp_path = "/tmp/test_numpy_streaming_basic.npy";
+ let num_samples = 4;
+ let sample_size = 3;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ ndarray_npy::write_npy(temp_path, &array).unwrap();
+
+ let mut reader = NumpyStreamingReader::new(temp_path).unwrap();
+ let mut buffer = vec![0.0; sample_size * 2];
+ let mut out = Vec::new();
+ loop {
+ let written = reader.read_chunk(&mut buffer).unwrap();
+ if written == 0 {
+ break;
+ }
+ out.extend_from_slice(&buffer[..written]);
+ }
+
+ assert_eq!(out, data);
+ assert_eq!(reader.total_rows(), num_samples);
+
+ fs::remove_file(temp_path).unwrap();
+ }
+
+ #[test]
+ fn test_numpy_streaming_reader_fortran_order() {
+ let temp_path = "/tmp/test_numpy_streaming_fortran.npy";
+ let num_samples = 3;
+ let sample_size = 4;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ let array_f = array.reversed_axes();
+ let array_f = array_f.as_standard_layout().reversed_axes();
+ ndarray_npy::write_npy(temp_path, &array_f).unwrap();
+
+ let mut reader = NumpyStreamingReader::new(temp_path).unwrap();
+ let mut buffer = vec![0.0; sample_size];
+ let mut out = Vec::new();
+ loop {
+ let written = reader.read_chunk(&mut buffer).unwrap();
+ if written == 0 {
+ break;
+ }
+ out.extend_from_slice(&buffer[..written]);
+ }
+
+ assert_eq!(out, data);
+
+ fs::remove_file(temp_path).unwrap();
+ }
+
+ #[test]
+ fn test_numpy_mmap_reader_basic() {
+ let temp_path = "/tmp/test_numpy_mmap_basic.npy";
+ let num_samples = 5;
+ let sample_size = 2;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ ndarray_npy::write_npy(temp_path, &array).unwrap();
+
+ let mut reader = NumpyMmapReader::new(temp_path).unwrap();
+ let (read_data, read_samples, read_size) =
reader.read_batch().unwrap();
+
+ assert_eq!(read_samples, num_samples);
+ assert_eq!(read_size, sample_size);
+ assert_eq!(read_data, data);
+
+ fs::remove_file(temp_path).unwrap();
+ }
+
+ #[test]
+ fn test_numpy_mmap_reader_fortran_order() {
+ let temp_path = "/tmp/test_numpy_mmap_fortran.npy";
+ let num_samples = 2;
+ let sample_size = 3;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ let array_f = array.reversed_axes();
+ let array_f = array_f.as_standard_layout().reversed_axes();
+ ndarray_npy::write_npy(temp_path, &array_f).unwrap();
+
+ let mut reader = NumpyMmapReader::new(temp_path).unwrap();
+ let (read_data, read_samples, read_size) =
reader.read_batch().unwrap();
+
+ assert_eq!(read_samples, num_samples);
+ assert_eq!(read_size, sample_size);
+ assert_eq!(read_data, data);
+
+ fs::remove_file(temp_path).unwrap();
Review Comment:
Test cleanup with fs::remove_file().unwrap() can panic if the file removal
fails (e.g., file already deleted, permission issues). This could cause the
test to panic even if the actual test logic passed. Consider using a result
pattern or ignore errors during cleanup to make tests more robust.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -18,13 +18,286 @@
//!
//! Provides support for reading .npy files containing 2D float64 arrays.
-use std::path::Path;
+use std::fs::File;
+use std::io::{Read, Seek, SeekFrom};
+use std::path::{Path, PathBuf};
+use memmap2::Mmap;
use ndarray::Array2;
use ndarray_npy::ReadNpyError;
use crate::error::{MahoutError, Result};
-use crate::reader::DataReader;
+use crate::reader::{DataReader, StreamingDataReader};
+
+const NPY_MAGIC: &[u8; 6] = b"\x93NUMPY";
+
+#[derive(Clone, Debug)]
+struct NpyHeader {
+ fortran_order: bool,
+ num_samples: usize,
+ sample_size: usize,
+ data_offset: u64,
+}
+
+impl NpyHeader {
+ fn total_elements(&self) -> usize {
+ self.num_samples * self.sample_size
Review Comment:
The total_elements method performs multiplication without overflow checking.
While the header validation in read_npy_header does check for overflow (lines
223-225), this method could still be called in other contexts where overflow
wasn't previously validated. Consider using checked_mul here as well for
consistency and safety, or document that this method assumes validation has
already been performed.
```suggestion
/// Returns the total number of elements represented by this header.
///
/// Panics if the multiplication of `num_samples` and `sample_size`
overflows `usize`.
fn total_elements(&self) -> usize {
self.num_samples
.checked_mul(self.sample_size)
.expect("overflow while computing total number of elements from
NpyHeader")
```
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -219,6 +743,103 @@ mod tests {
fs::remove_file(temp_path).unwrap();
}
+ #[test]
+ fn test_numpy_streaming_reader_basic() {
+ let temp_path = "/tmp/test_numpy_streaming_basic.npy";
+ let num_samples = 4;
+ let sample_size = 3;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ ndarray_npy::write_npy(temp_path, &array).unwrap();
+
+ let mut reader = NumpyStreamingReader::new(temp_path).unwrap();
+ let mut buffer = vec![0.0; sample_size * 2];
+ let mut out = Vec::new();
+ loop {
+ let written = reader.read_chunk(&mut buffer).unwrap();
+ if written == 0 {
+ break;
+ }
+ out.extend_from_slice(&buffer[..written]);
+ }
+
+ assert_eq!(out, data);
+ assert_eq!(reader.total_rows(), num_samples);
+
+ fs::remove_file(temp_path).unwrap();
+ }
+
+ #[test]
+ fn test_numpy_streaming_reader_fortran_order() {
+ let temp_path = "/tmp/test_numpy_streaming_fortran.npy";
+ let num_samples = 3;
+ let sample_size = 4;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ let array_f = array.reversed_axes();
+ let array_f = array_f.as_standard_layout().reversed_axes();
+ ndarray_npy::write_npy(temp_path, &array_f).unwrap();
+
+ let mut reader = NumpyStreamingReader::new(temp_path).unwrap();
+ let mut buffer = vec![0.0; sample_size];
+ let mut out = Vec::new();
+ loop {
+ let written = reader.read_chunk(&mut buffer).unwrap();
+ if written == 0 {
+ break;
+ }
+ out.extend_from_slice(&buffer[..written]);
+ }
+
+ assert_eq!(out, data);
+
+ fs::remove_file(temp_path).unwrap();
Review Comment:
Test cleanup with fs::remove_file().unwrap() can panic if the file removal
fails (e.g., file already deleted, permission issues). This could cause the
test to panic even if the actual test logic passed. Consider using a result
pattern or ignore errors during cleanup to make tests more robust.
##########
qdp/qdp-core/src/io.rs:
##########
@@ -260,6 +260,24 @@ pub fn read_numpy_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize, usi
reader.read_batch()
}
+/// Reads batch data from a NumPy .npy file using streaming IO.
+///
+/// Avoids the intermediate `Array2` allocation used by `read_numpy_batch`.
+pub fn read_numpy_streaming_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize, usize)> {
+ use crate::reader::DataReader;
+ let mut reader = crate::readers::NumpyStreamingReader::new(path)?;
+ reader.read_batch()
+}
Review Comment:
The integration test suite in tests/numpy.rs doesn't include tests for the
newly added NumpyStreamingReader and NumpyMmapReader implementations. While
unit tests exist in the numpy.rs module itself, integration tests would help
ensure these readers work correctly in real-world scenarios and maintain API
compatibility. Consider adding integration tests similar to the existing
test_read_numpy_batch_function but using the streaming and mmap variants.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -18,13 +18,286 @@
//!
//! Provides support for reading .npy files containing 2D float64 arrays.
-use std::path::Path;
+use std::fs::File;
+use std::io::{Read, Seek, SeekFrom};
+use std::path::{Path, PathBuf};
+use memmap2::Mmap;
use ndarray::Array2;
use ndarray_npy::ReadNpyError;
use crate::error::{MahoutError, Result};
-use crate::reader::DataReader;
+use crate::reader::{DataReader, StreamingDataReader};
+
+const NPY_MAGIC: &[u8; 6] = b"\x93NUMPY";
+
+#[derive(Clone, Debug)]
+struct NpyHeader {
+ fortran_order: bool,
+ num_samples: usize,
+ sample_size: usize,
+ data_offset: u64,
+}
+
+impl NpyHeader {
+ fn total_elements(&self) -> usize {
+ self.num_samples * self.sample_size
+ }
+}
+
+fn parse_header_value<'a>(header: &'a str, key: &str) -> Result<&'a str> {
+ let key_single = format!("'{}'", key);
+ let mut start = header.find(&key_single);
+ if start.is_none() {
+ let key_double = format!("\"{}\"", key);
+ start = header.find(&key_double);
+ }
+ let start = start.ok_or_else(|| {
+ MahoutError::InvalidInput(format!("Missing '{}' entry in .npy header",
key))
+ })?;
+ let rest = &header[start..];
+ let colon = rest
+ .find(':')
+ .ok_or_else(|| MahoutError::InvalidInput("Malformed .npy
header".to_string()))?;
+ Ok(rest[colon + 1..].trim_start())
+}
+
+fn parse_quoted_value(header: &str, key: &str) -> Result<String> {
+ let rest = parse_header_value(header, key)?;
+ let mut chars = rest.chars();
+ let quote = chars
+ .next()
+ .ok_or_else(|| MahoutError::InvalidInput("Malformed .npy
header".to_string()))?;
+ if quote != '\'' && quote != '"' {
+ return Err(MahoutError::InvalidInput(format!(
+ "Expected quoted value for '{}'",
+ key
+ )));
+ }
+ let rest = &rest[1..];
+ let end = rest
+ .find(quote)
+ .ok_or_else(|| MahoutError::InvalidInput(format!("Unterminated string
for '{}'", key)))?;
+ Ok(rest[..end].to_string())
+}
+
+fn parse_bool_value(header: &str, key: &str) -> Result<bool> {
+ let rest = parse_header_value(header, key)?;
+ if rest.starts_with("True") {
+ Ok(true)
+ } else if rest.starts_with("False") {
+ Ok(false)
+ } else {
+ Err(MahoutError::InvalidInput(format!(
+ "Expected True/False for '{}'",
+ key
+ )))
+ }
+}
+
+fn parse_shape_value(header: &str, key: &str) -> Result<Vec<usize>> {
+ let rest = parse_header_value(header, key)?;
+ let rest = rest.trim_start();
+ if !rest.starts_with('(') {
+ return Err(MahoutError::InvalidInput(
+ "Malformed shape in .npy header".to_string(),
+ ));
+ }
+ let end = rest
+ .find(')')
+ .ok_or_else(|| MahoutError::InvalidInput("Malformed shape in .npy
header".to_string()))?;
+ let inner = &rest[1..end];
+ let mut dims = Vec::new();
+ for part in inner.split(',') {
+ let part = part.trim();
+ if part.is_empty() {
+ continue;
+ }
+ let value = part.parse::<usize>().map_err(|e| {
+ MahoutError::InvalidInput(format!("Invalid shape value '{}': {}",
part, e))
+ })?;
+ dims.push(value);
+ }
+ if dims.is_empty() {
+ return Err(MahoutError::InvalidInput(
+ "Empty shape in .npy header".to_string(),
+ ));
+ }
+ Ok(dims)
+}
+
+fn validate_descr(descr: &str) -> Result<()> {
+ let (endian, typecode) = match descr.chars().next() {
+ Some('<') | Some('>') | Some('|') | Some('=') => {
+ (Some(descr.chars().next().unwrap()), &descr[1..])
+ }
+ _ => (None, descr),
+ };
Review Comment:
The variable `endian` is assigned but never used in this function. The
endian checking logic only uses pattern matching directly without storing the
value. Consider removing this unused binding or using it if there's a need for
additional endian validation.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -219,6 +743,103 @@ mod tests {
fs::remove_file(temp_path).unwrap();
}
+ #[test]
+ fn test_numpy_streaming_reader_basic() {
+ let temp_path = "/tmp/test_numpy_streaming_basic.npy";
Review Comment:
Tests use hardcoded /tmp paths which are not portable across operating
systems (e.g., Windows doesn't have /tmp). Consider using std::env::temp_dir()
or tempfile crate to generate platform-independent temporary file paths for
better cross-platform compatibility.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+ file: File,
+ header: NpyHeader,
+ row_cursor: usize,
+ column_buf: Vec<f64>,
+}
+
+impl NumpyStreamingReader {
+ /// Create a new streaming NumPy reader.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let path = path.as_ref();
+
+ match path.try_exists() {
+ Ok(false) => {
+ return Err(MahoutError::Io(format!(
+ "NumPy file not found: {}",
+ path.display()
+ )));
+ }
+ Err(e) => {
+ return Err(MahoutError::Io(format!(
+ "Failed to check if NumPy file exists at {}: {}",
+ path.display(),
+ e
+ )));
+ }
+ Ok(true) => {}
+ }
+
+ let mut file = File::open(path)
+ .map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+ let header = read_npy_header(path, &mut file)?;
+
+ Ok(Self {
+ file,
+ header,
+ row_cursor: 0,
+ column_buf: Vec::new(),
+ })
+ }
+}
+
+impl DataReader for NumpyStreamingReader {
+ fn read_batch(&mut self) -> Result<(Vec<f64>, usize, usize)> {
+ let total_elements = self.header.total_elements();
+ let mut data = vec![0.0; total_elements];
+ let mut written = 0;
+ while written < total_elements {
+ let n = self.read_chunk(&mut data[written..])?;
+ if n == 0 {
+ break;
+ }
+ written += n;
+ }
+ if written != total_elements {
+ data.truncate(written);
+ }
+
+ Ok((data, self.header.num_samples, self.header.sample_size))
+ }
+
+ fn get_sample_size(&self) -> Option<usize> {
+ Some(self.header.sample_size)
+ }
+
+ fn get_num_samples(&self) -> Option<usize> {
+ Some(self.header.num_samples)
+ }
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+ fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ if self.row_cursor >= self.header.num_samples {
+ return Ok(0);
+ }
+
+ let sample_size = self.header.sample_size;
+ let max_rows = buffer.len() / sample_size;
+ if max_rows == 0 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Buffer too small for one sample (need {} elements)",
+ sample_size
+ )));
+ }
+
+ let remaining_rows = self.header.num_samples - self.row_cursor;
+ let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+ let elem_count = rows_to_read * sample_size;
+
+ if !self.header.fortran_order {
+ let offset = self.header.data_offset
+ + (self.row_cursor * sample_size * std::mem::size_of::<f64>())
as u64;
+ read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+ } else {
+ if self.column_buf.len() < rows_to_read {
+ self.column_buf.resize(rows_to_read, 0.0);
+ }
+ for col in 0..sample_size {
+ let offset = self.header.data_offset
+ + ((col * self.header.num_samples + self.row_cursor)
+ * std::mem::size_of::<f64>()) as u64;
Review Comment:
Similar to line 525, the offset calculation could potentially overflow when
converting to u64. The nested multiplication `((col * self.header.num_samples +
self.row_cursor) * std::mem::size_of::<f64>()) as u64` could overflow on large
datasets before the cast. Consider using checked arithmetic or performing the
cast earlier in the calculation chain.
```suggestion
let element_byte_offset = (self.row_cursor as u64)
.checked_mul(sample_size as u64)
.and_then(|v| v.checked_mul(std::mem::size_of::<f64>() as
u64))
.and_then(|v| self.header.data_offset.checked_add(v))
.ok_or_else(|| {
MahoutError::InvalidInput(
"Computed file offset overflows u64 for C-order
data".to_string(),
)
})?;
read_f64s_at(
&mut self.file,
element_byte_offset,
&mut buffer[..elem_count],
)?;
} else {
if self.column_buf.len() < rows_to_read {
self.column_buf.resize(rows_to_read, 0.0);
}
for col in 0..sample_size {
let element_index = (col as u64)
.checked_mul(self.header.num_samples as u64)
.and_then(|v| v.checked_add(self.row_cursor as u64))
.ok_or_else(|| {
MahoutError::InvalidInput(
"Computed element index overflows u64 for
Fortran-order data"
.to_string(),
)
})?;
let offset = element_index
.checked_mul(std::mem::size_of::<f64>() as u64)
.and_then(|v| self.header.data_offset.checked_add(v))
.ok_or_else(|| {
MahoutError::InvalidInput(
"Computed file offset overflows u64 for
Fortran-order data"
.to_string(),
)
})?;
```
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -219,6 +743,103 @@ mod tests {
fs::remove_file(temp_path).unwrap();
}
+ #[test]
+ fn test_numpy_streaming_reader_basic() {
+ let temp_path = "/tmp/test_numpy_streaming_basic.npy";
+ let num_samples = 4;
+ let sample_size = 3;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ ndarray_npy::write_npy(temp_path, &array).unwrap();
+
+ let mut reader = NumpyStreamingReader::new(temp_path).unwrap();
+ let mut buffer = vec![0.0; sample_size * 2];
+ let mut out = Vec::new();
+ loop {
+ let written = reader.read_chunk(&mut buffer).unwrap();
+ if written == 0 {
+ break;
+ }
+ out.extend_from_slice(&buffer[..written]);
+ }
+
+ assert_eq!(out, data);
+ assert_eq!(reader.total_rows(), num_samples);
+
+ fs::remove_file(temp_path).unwrap();
+ }
+
+ #[test]
+ fn test_numpy_streaming_reader_fortran_order() {
+ let temp_path = "/tmp/test_numpy_streaming_fortran.npy";
+ let num_samples = 3;
+ let sample_size = 4;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ let array_f = array.reversed_axes();
+ let array_f = array_f.as_standard_layout().reversed_axes();
+ ndarray_npy::write_npy(temp_path, &array_f).unwrap();
+
+ let mut reader = NumpyStreamingReader::new(temp_path).unwrap();
+ let mut buffer = vec![0.0; sample_size];
+ let mut out = Vec::new();
+ loop {
+ let written = reader.read_chunk(&mut buffer).unwrap();
+ if written == 0 {
+ break;
+ }
+ out.extend_from_slice(&buffer[..written]);
+ }
+
+ assert_eq!(out, data);
+
+ fs::remove_file(temp_path).unwrap();
+ }
+
+ #[test]
+ fn test_numpy_mmap_reader_basic() {
+ let temp_path = "/tmp/test_numpy_mmap_basic.npy";
+ let num_samples = 5;
+ let sample_size = 2;
+
+ let data: Vec<f64> = (0..num_samples * sample_size).map(|i| i as
f64).collect();
+ let array = Array2::from_shape_vec((num_samples, sample_size),
data.clone()).unwrap();
+ ndarray_npy::write_npy(temp_path, &array).unwrap();
+
+ let mut reader = NumpyMmapReader::new(temp_path).unwrap();
+ let (read_data, read_samples, read_size) =
reader.read_batch().unwrap();
+
+ assert_eq!(read_samples, num_samples);
+ assert_eq!(read_size, sample_size);
+ assert_eq!(read_data, data);
+
+ fs::remove_file(temp_path).unwrap();
Review Comment:
Test cleanup with fs::remove_file().unwrap() can panic if the file removal
fails (e.g., file already deleted, permission issues). This could cause the
test to panic even if the actual test logic passed. Consider using a result
pattern or ignore errors during cleanup to make tests more robust.
##########
qdp/qdp-core/src/readers/numpy.rs:
##########
@@ -155,9 +428,260 @@ impl DataReader for NumpyReader {
}
}
+/// Streaming reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Reads data in chunks without loading the entire file into memory.
+pub struct NumpyStreamingReader {
+ file: File,
+ header: NpyHeader,
+ row_cursor: usize,
+ column_buf: Vec<f64>,
+}
+
+impl NumpyStreamingReader {
+ /// Create a new streaming NumPy reader.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let path = path.as_ref();
+
+ match path.try_exists() {
+ Ok(false) => {
+ return Err(MahoutError::Io(format!(
+ "NumPy file not found: {}",
+ path.display()
+ )));
+ }
+ Err(e) => {
+ return Err(MahoutError::Io(format!(
+ "Failed to check if NumPy file exists at {}: {}",
+ path.display(),
+ e
+ )));
+ }
+ Ok(true) => {}
+ }
+
+ let mut file = File::open(path)
+ .map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+ let header = read_npy_header(path, &mut file)?;
+
+ Ok(Self {
+ file,
+ header,
+ row_cursor: 0,
+ column_buf: Vec::new(),
+ })
+ }
+}
+
+impl DataReader for NumpyStreamingReader {
+ fn read_batch(&mut self) -> Result<(Vec<f64>, usize, usize)> {
+ let total_elements = self.header.total_elements();
+ let mut data = vec![0.0; total_elements];
+ let mut written = 0;
+ while written < total_elements {
+ let n = self.read_chunk(&mut data[written..])?;
+ if n == 0 {
+ break;
+ }
+ written += n;
+ }
+ if written != total_elements {
+ data.truncate(written);
+ }
+
+ Ok((data, self.header.num_samples, self.header.sample_size))
+ }
+
+ fn get_sample_size(&self) -> Option<usize> {
+ Some(self.header.sample_size)
+ }
+
+ fn get_num_samples(&self) -> Option<usize> {
+ Some(self.header.num_samples)
+ }
+}
+
+impl StreamingDataReader for NumpyStreamingReader {
+ fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ if self.row_cursor >= self.header.num_samples {
+ return Ok(0);
+ }
+
+ let sample_size = self.header.sample_size;
+ let max_rows = buffer.len() / sample_size;
+ if max_rows == 0 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Buffer too small for one sample (need {} elements)",
+ sample_size
+ )));
+ }
+
+ let remaining_rows = self.header.num_samples - self.row_cursor;
+ let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+ let elem_count = rows_to_read * sample_size;
+
+ if !self.header.fortran_order {
+ let offset = self.header.data_offset
+ + (self.row_cursor * sample_size * std::mem::size_of::<f64>())
as u64;
+ read_f64s_at(&mut self.file, offset, &mut buffer[..elem_count])?;
+ } else {
+ if self.column_buf.len() < rows_to_read {
+ self.column_buf.resize(rows_to_read, 0.0);
+ }
+ for col in 0..sample_size {
+ let offset = self.header.data_offset
+ + ((col * self.header.num_samples + self.row_cursor)
+ * std::mem::size_of::<f64>()) as u64;
+ let column = &mut self.column_buf[..rows_to_read];
+ read_f64s_at(&mut self.file, offset, column)?;
+ for row in 0..rows_to_read {
+ buffer[row * sample_size + col] = column[row];
+ }
+ }
+ }
+
+ self.row_cursor += rows_to_read;
+ Ok(elem_count)
+ }
+
+ fn total_rows(&self) -> usize {
+ self.header.num_samples
+ }
+}
+
+/// Memory-mapped reader for NumPy `.npy` files containing 2D float64 arrays.
+///
+/// Maps the file into memory and streams slices without an extra read +
flatten pass.
+pub struct NumpyMmapReader {
+ mmap: Mmap,
+ header: NpyHeader,
+ row_cursor: usize,
+ column_buf: Vec<f64>,
+}
+
+impl NumpyMmapReader {
+ /// Create a new memory-mapped NumPy reader.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let path = path.as_ref();
+
+ match path.try_exists() {
+ Ok(false) => {
+ return Err(MahoutError::Io(format!(
+ "NumPy file not found: {}",
+ path.display()
+ )));
+ }
+ Err(e) => {
+ return Err(MahoutError::Io(format!(
+ "Failed to check if NumPy file exists at {}: {}",
+ path.display(),
+ e
+ )));
+ }
+ Ok(true) => {}
+ }
+
+ let mut file = File::open(path)
+ .map_err(|e| MahoutError::Io(format!("Failed to open NumPy file:
{}", e)))?;
+ let header = read_npy_header(path, &mut file)?;
+ let mmap = unsafe {
+ Mmap::map(&file)
+ .map_err(|e| MahoutError::Io(format!("Failed to mmap NumPy
file: {}", e)))?
+ };
+
+ Ok(Self {
+ mmap,
+ header,
+ row_cursor: 0,
+ column_buf: Vec::new(),
+ })
+ }
+}
+
+impl DataReader for NumpyMmapReader {
+ fn read_batch(&mut self) -> Result<(Vec<f64>, usize, usize)> {
+ let total_elements = self.header.total_elements();
+ let mut data = vec![0.0; total_elements];
+ let mut written = 0;
+ while written < total_elements {
+ let n = self.read_chunk(&mut data[written..])?;
+ if n == 0 {
+ break;
+ }
+ written += n;
+ }
+ if written != total_elements {
+ data.truncate(written);
+ }
+
+ Ok((data, self.header.num_samples, self.header.sample_size))
+ }
+
+ fn get_sample_size(&self) -> Option<usize> {
+ Some(self.header.sample_size)
+ }
+
+ fn get_num_samples(&self) -> Option<usize> {
+ Some(self.header.num_samples)
+ }
+}
+
+impl StreamingDataReader for NumpyMmapReader {
+ fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ if self.row_cursor >= self.header.num_samples {
+ return Ok(0);
+ }
+
+ let sample_size = self.header.sample_size;
+ let max_rows = buffer.len() / sample_size;
+ if max_rows == 0 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Buffer too small for one sample (need {} elements)",
+ sample_size
+ )));
+ }
+
+ let remaining_rows = self.header.num_samples - self.row_cursor;
+ let rows_to_read = std::cmp::min(max_rows, remaining_rows);
+ let elem_count = rows_to_read * sample_size;
+ let data_base = self.header.data_offset as usize;
+
+ if !self.header.fortran_order {
+ let start = data_base + self.row_cursor * sample_size *
std::mem::size_of::<f64>();
Review Comment:
The offset calculation could potentially overflow when performing
multiplication before conversion to usize. The expression `self.row_cursor *
sample_size * std::mem::size_of::<f64>()` could overflow on large datasets.
Consider using checked arithmetic to prevent potential overflow and provide a
clear error message if it occurs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]