This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/main by this push:
new 7ee09afe7 [QDP] File and streaming Quantum Data Loader (#1010)
7ee09afe7 is described below
commit 7ee09afe788a99333f40f1e67bf806981bb9cf80
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Sat Feb 7 22:45:48 2026 +0800
[QDP] File and streaming Quantum Data Loader (#1010)
* [QDP] File and streaming Quantum Data Loader
* refactor and update
---
qdp/qdp-core/src/pipeline_runner.rs | 330 +++++++++++++++++++-
qdp/qdp-python/qumat_qdp/loader.py | 101 ++++--
qdp/qdp-python/src/lib.rs | 378 ++++++++++-------------
qdp/qdp-python/tests/test_dlpack_validation.py | 29 +-
qdp/qdp-python/tests/test_quantum_data_loader.py | 143 +++++++++
5 files changed, 717 insertions(+), 264 deletions(-)
diff --git a/qdp/qdp-core/src/pipeline_runner.rs
b/qdp/qdp-core/src/pipeline_runner.rs
index 9952e26ad..93d577b4b 100644
--- a/qdp/qdp-core/src/pipeline_runner.rs
+++ b/qdp/qdp-core/src/pipeline_runner.rs
@@ -18,11 +18,16 @@
// Python bindings release GIL during the run.
use std::f64::consts::PI;
+use std::path::Path;
+use std::sync::Mutex;
use std::time::Instant;
use crate::QdpEngine;
use crate::dlpack::DLManagedTensor;
-use crate::error::Result;
+use crate::error::{MahoutError, Result};
+use crate::io;
+use crate::reader::StreamingDataReader;
+use crate::readers::ParquetStreamingReader;
/// Configuration for throughput/latency pipeline runs (Python
run_throughput_pipeline_py).
#[derive(Clone, Debug)]
@@ -58,14 +63,111 @@ pub struct PipelineRunResult {
pub latency_ms_per_vector: f64,
}
-/// Data source for the pipeline iterator (Phase 1: Synthetic only; Phase 2:
File).
-#[derive(Debug)]
+/// Data source for the pipeline iterator (Phase 1: Synthetic; Phase 2a:
InMemory; Phase 2b: Streaming).
pub enum DataSource {
Synthetic {
seed: u64,
batch_index: usize,
total_batches: usize,
},
+ /// Phase 2a: full file loaded once; iterator slices by batch_size.
+ InMemory {
+ data: Vec<f64>,
+ cursor: usize,
+ num_samples: usize,
+ sample_size: usize,
+ batches_yielded: usize,
+ batch_limit: usize,
+ },
+ /// Phase 2b: stream from Parquet in chunks; iterator refills buffer and
encodes by batch.
+ /// Reader is in Mutex so PipelineIterator remains Sync (required by PyO3
pyclass).
+ Streaming {
+ reader: Mutex<ParquetStreamingReader>,
+ buffer: Vec<f64>,
+ buffer_cursor: usize,
+ read_chunk_scratch: Vec<f64>,
+ sample_size: usize,
+ batch_limit: usize,
+ batches_yielded: usize,
+ },
+}
+
+impl std::fmt::Debug for DataSource {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ DataSource::Synthetic {
+ seed,
+ batch_index,
+ total_batches,
+ } => f
+ .debug_struct("Synthetic")
+ .field("seed", seed)
+ .field("batch_index", batch_index)
+ .field("total_batches", total_batches)
+ .finish(),
+ DataSource::InMemory {
+ cursor,
+ num_samples,
+ sample_size,
+ batches_yielded,
+ batch_limit,
+ ..
+ } => f
+ .debug_struct("InMemory")
+ .field("cursor", cursor)
+ .field("num_samples", num_samples)
+ .field("sample_size", sample_size)
+ .field("batches_yielded", batches_yielded)
+ .field("batch_limit", batch_limit)
+ .finish(),
+ DataSource::Streaming {
+ buffer,
+ buffer_cursor,
+ sample_size,
+ batch_limit,
+ batches_yielded,
+ ..
+ } => f
+ .debug_struct("Streaming")
+ .field("buffer_len", &buffer.len())
+ .field("buffer_cursor", buffer_cursor)
+ .field("sample_size", sample_size)
+ .field("batch_limit", batch_limit)
+ .field("batches_yielded", batches_yielded)
+ .finish(),
+ }
+ }
+}
+
+/// Default Parquet row group size for streaming reader (tunable).
+const DEFAULT_PARQUET_ROW_GROUP_SIZE: usize = 2048;
+
+/// When buffer_cursor >= buffer.len() / BUFFER_COMPACT_DENOM, compact by
draining consumed prefix.
+const BUFFER_COMPACT_DENOM: usize = 2;
+
+/// Returns the path extension as lowercase ASCII (e.g. "parquet"), or None if
missing/non-UTF8.
+fn path_extension_lower(path: &Path) -> Option<String> {
+ path.extension()
+ .and_then(|e| e.to_str())
+ .map(|s| s.to_lowercase())
+}
+
+/// Dispatches by path extension to the appropriate io reader. Returns (data,
num_samples, sample_size).
+/// Unsupported or missing extension returns Err with message listing
supported formats.
+fn read_file_by_extension(path: &Path) -> Result<(Vec<f64>, usize, usize)> {
+ let ext_lower = path_extension_lower(path);
+ let ext = ext_lower.as_deref();
+ match ext {
+ Some("parquet") => io::read_parquet_batch(path),
+ Some("arrow") | Some("feather") | Some("ipc") =>
io::read_arrow_ipc_batch(path),
+ Some("npy") => io::read_numpy_batch(path),
+ Some("pt") | Some("pth") => io::read_torch_batch(path),
+ Some("pb") => io::read_tensorflow_batch(path),
+ _ => Err(MahoutError::InvalidInput(format!(
+ "Unsupported file extension {:?}. Supported: .parquet, .arrow,
.feather, .ipc, .npy, .pt, .pth, .pb",
+ path.extension()
+ ))),
+ }
}
/// Stateful iterator that yields one batch DLPack at a time for Python `for`
loop consumption.
@@ -77,6 +179,9 @@ pub struct PipelineIterator {
vector_len: usize,
}
+/// (batch_data, batch_n, sample_size, num_qubits) from one source pull.
+type BatchFromSource = (Vec<f64>, usize, usize, usize);
+
impl PipelineIterator {
/// Create a new synthetic-data pipeline iterator.
pub fn new_synthetic(engine: QdpEngine, config: PipelineConfig) ->
Result<Self> {
@@ -94,26 +199,227 @@ impl PipelineIterator {
})
}
- /// Returns the next batch as a DLPack pointer; `Ok(None)` when exhausted.
- pub fn next_batch(&mut self) -> Result<Option<*mut DLManagedTensor>> {
- let (batch_data, num_qubits) = match &mut self.source {
+ /// Create a pipeline iterator from a file (Phase 2a: load full file then
slice by batch).
+ /// Dispatches by path extension; validates dimensions at construction.
+ ///
+ /// Supported extensions: .parquet, .arrow, .feather, .ipc, .npy, .pt,
.pth, .pb.
+ /// For file source, `batch_limit` caps batches yielded (e.g. for
testing); use `usize::MAX` to iterate until EOF.
+ pub fn new_from_file<P: AsRef<Path>>(
+ engine: QdpEngine,
+ path: P,
+ config: PipelineConfig,
+ batch_limit: usize,
+ ) -> Result<Self> {
+ let path = path.as_ref();
+ let (data, num_samples, sample_size) = read_file_by_extension(path)?;
+ let vector_len = vector_len(config.num_qubits,
&config.encoding_method);
+
+ // Dimension validation at construction.
+ if sample_size != vector_len {
+ return Err(MahoutError::InvalidInput(format!(
+ "File feature length {} does not match vector_len {} for
num_qubits={}, encoding={}",
+ sample_size, vector_len, config.num_qubits,
config.encoding_method
+ )));
+ }
+ if data.len() != num_samples * sample_size {
+ return Err(MahoutError::InvalidInput(format!(
+ "File data length {} is not num_samples ({}) * sample_size
({})",
+ data.len(),
+ num_samples,
+ sample_size
+ )));
+ }
+
+ let source = DataSource::InMemory {
+ data,
+ cursor: 0,
+ num_samples,
+ sample_size,
+ batches_yielded: 0,
+ batch_limit,
+ };
+ Ok(Self {
+ engine,
+ config,
+ source,
+ vector_len,
+ })
+ }
+
+ /// Create a pipeline iterator from a Parquet file using streaming read
(Phase 2b).
+ /// Only `.parquet` is supported; reduces memory for large files by
reading in chunks.
+ /// Validates sample_size == vector_len after the first chunk.
+ pub fn new_from_file_streaming<P: AsRef<Path>>(
+ engine: QdpEngine,
+ path: P,
+ config: PipelineConfig,
+ batch_limit: usize,
+ ) -> Result<Self> {
+ let path = path.as_ref();
+ if path_extension_lower(path).as_deref() != Some("parquet") {
+ return Err(MahoutError::InvalidInput(format!(
+ "Streaming file loader supports only .parquet; got extension
{:?}. Use .source_file(path) for other formats.",
+ path.extension()
+ )));
+ }
+
+ let mut reader = ParquetStreamingReader::new(path,
Some(DEFAULT_PARQUET_ROW_GROUP_SIZE))?;
+ let vector_len = vector_len(config.num_qubits,
&config.encoding_method);
+
+ // Read first chunk to learn sample_size; reuse as initial buffer.
+ const INITIAL_CHUNK_CAP: usize = 64 * 1024;
+ let mut buffer = vec![0.0; INITIAL_CHUNK_CAP];
+ let written = reader.read_chunk(&mut buffer)?;
+ if written == 0 {
+ return Err(MahoutError::InvalidInput(
+ "Parquet file is empty or contains no data.".to_string(),
+ ));
+ }
+ let sample_size = reader.get_sample_size().ok_or_else(|| {
+ MahoutError::InvalidInput(
+ "Parquet streaming reader did not set sample_size after first
chunk.".to_string(),
+ )
+ })?;
+
+ if sample_size != vector_len {
+ return Err(MahoutError::InvalidInput(format!(
+ "File feature length {} does not match vector_len {} for
num_qubits={}, encoding={}",
+ sample_size, vector_len, config.num_qubits,
config.encoding_method
+ )));
+ }
+
+ buffer.truncate(written);
+ let read_chunk_scratch = vec![0.0; INITIAL_CHUNK_CAP];
+
+ let source = DataSource::Streaming {
+ reader: Mutex::new(reader),
+ buffer,
+ buffer_cursor: 0,
+ read_chunk_scratch,
+ sample_size,
+ batch_limit,
+ batches_yielded: 0,
+ };
+ Ok(Self {
+ engine,
+ config,
+ source,
+ vector_len,
+ })
+ }
+
+ /// Yields the next batch data from the current source; `None` when
exhausted.
+ /// Returns (batch_data, batch_n, sample_size, num_qubits).
+ fn take_batch_from_source(&mut self) -> Result<Option<BatchFromSource>> {
+ Ok(match &mut self.source {
DataSource::Synthetic {
batch_index,
total_batches,
..
} => {
if *batch_index >= *total_batches {
- return Ok(None);
+ None
+ } else {
+ let data = generate_batch(&self.config, *batch_index,
self.vector_len);
+ *batch_index += 1;
+ Some((
+ data,
+ self.config.batch_size,
+ self.vector_len,
+ self.config.num_qubits as usize,
+ ))
+ }
+ }
+ DataSource::InMemory {
+ data,
+ cursor,
+ sample_size,
+ batches_yielded,
+ batch_limit,
+ ..
+ } => {
+ if *batches_yielded >= *batch_limit {
+ None
+ } else {
+ let remaining = (data.len() - *cursor) / *sample_size;
+ if remaining == 0 {
+ None
+ } else {
+ let batch_n = remaining.min(self.config.batch_size);
+ let start = *cursor;
+ let end = start + batch_n * *sample_size;
+ *cursor = end;
+ *batches_yielded += 1;
+ let slice = data[start..end].to_vec();
+ Some((
+ slice,
+ batch_n,
+ *sample_size,
+ self.config.num_qubits as usize,
+ ))
+ }
}
- let data = generate_batch(&self.config, *batch_index,
self.vector_len);
- *batch_index += 1;
- (data, self.config.num_qubits as usize)
}
+ DataSource::Streaming {
+ reader,
+ buffer,
+ buffer_cursor,
+ read_chunk_scratch,
+ sample_size,
+ batch_limit,
+ batches_yielded,
+ } => {
+ if *batches_yielded >= *batch_limit {
+ None
+ } else {
+ let required = self.config.batch_size * *sample_size;
+ while (buffer.len() - *buffer_cursor) < required {
+ let r = reader.get_mut().map_err(|e| {
+ MahoutError::Io(format!("Streaming reader mutex
poisoned: {}", e))
+ })?;
+ let written = r.read_chunk(read_chunk_scratch)?;
+ if written == 0 {
+ break;
+ }
+
buffer.extend_from_slice(&read_chunk_scratch[..written]);
+ }
+ let available = buffer.len() - *buffer_cursor;
+ let available_samples = available / *sample_size;
+ if available_samples == 0 {
+ None
+ } else {
+ let batch_n =
available_samples.min(self.config.batch_size);
+ let start = *buffer_cursor;
+ let end = start + batch_n * *sample_size;
+ *buffer_cursor = end;
+ *batches_yielded += 1;
+ let slice = buffer[start..end].to_vec();
+ if *buffer_cursor >= buffer.len() /
BUFFER_COMPACT_DENOM {
+ buffer.drain(..*buffer_cursor);
+ *buffer_cursor = 0;
+ }
+ Some((
+ slice,
+ batch_n,
+ *sample_size,
+ self.config.num_qubits as usize,
+ ))
+ }
+ }
+ }
+ })
+ }
+
+ /// Returns the next batch as a DLPack pointer; `Ok(None)` when exhausted.
+ pub fn next_batch(&mut self) -> Result<Option<*mut DLManagedTensor>> {
+ let Some((batch_data, batch_n, sample_size, num_qubits)) =
self.take_batch_from_source()?
+ else {
+ return Ok(None);
};
let ptr = self.engine.encode_batch(
&batch_data,
- self.config.batch_size,
- self.vector_len,
+ batch_n,
+ sample_size,
num_qubits,
&self.config.encoding_method,
)?;
diff --git a/qdp/qdp-python/qumat_qdp/loader.py
b/qdp/qdp-python/qumat_qdp/loader.py
index d5c37d65a..6c908325d 100644
--- a/qdp/qdp-python/qumat_qdp/loader.py
+++ b/qdp/qdp-python/qumat_qdp/loader.py
@@ -35,13 +35,9 @@ from typing import TYPE_CHECKING, Iterator, Optional
if TYPE_CHECKING:
import _qdp # noqa: F401 -- for type checkers only
-# Rust interface expects seed as Option<u64>: non-negative and <= 2^64 - 1.
-# Ref: qdp-core PipelineConfig seed: Option<u64>
+# Seed must fit Rust u64: 0 <= seed <= 2^64 - 1.
_U64_MAX = 2**64 - 1
-# Lazy import _qdp at runtime until __iter__ is used; TYPE_CHECKING import
above
-# is for type checkers only so they can resolve "_qdp.*" annotations if needed.
-
@lru_cache(maxsize=1)
def _get_qdp():
@@ -116,6 +112,12 @@ class QuantumDataLoader:
self._total_batches = total_batches
self._encoding_method = encoding_method
self._seed = seed
+ self._file_path: Optional[str] = None
+ self._streaming_requested = (
+ False # set True by source_file(streaming=True); Phase 2b
+ )
+ self._synthetic_requested = False # set True only by
source_synthetic()
+ self._file_requested = False
def qubits(self, n: int) -> QuantumDataLoader:
"""Set number of qubits. Returns self for chaining."""
@@ -148,6 +150,7 @@ class QuantumDataLoader:
total_batches: Optional[int] = None,
) -> QuantumDataLoader:
"""Use synthetic data source (default). Optionally override
total_batches. Returns self."""
+ self._synthetic_requested = True
if total_batches is not None:
if not isinstance(total_batches, int) or total_batches < 1:
raise ValueError(
@@ -156,6 +159,23 @@ class QuantumDataLoader:
self._total_batches = total_batches
return self
+ def source_file(self, path: str, streaming: bool = False) ->
QuantumDataLoader:
+ """Use file data source. Path must point to a supported format.
Returns self.
+
+ For streaming=True (Phase 2b), only .parquet is supported; data is
read in chunks to reduce memory.
+ For streaming=False, supports .parquet, .arrow, .feather, .ipc, .npy,
.pt, .pth, .pb.
+ """
+ if not path or not isinstance(path, str):
+ raise ValueError(f"path must be a non-empty string, got {path!r}")
+ if streaming and not (path.lower().endswith(".parquet")):
+ raise ValueError(
+ "streaming=True supports only .parquet files; use
streaming=False for other formats."
+ )
+ self._file_path = path
+ self._file_requested = True
+ self._streaming_requested = streaming
+ return self
+
def seed(self, s: Optional[int] = None) -> QuantumDataLoader:
"""Set RNG seed for reproducible synthetic data (must fit Rust u64: 0
<= seed <= 2^64-1). Returns self."""
if s is not None:
@@ -170,16 +190,26 @@ class QuantumDataLoader:
self._seed = s
return self
- def __iter__(self) -> Iterator[object]:
- """Return Rust-backed iterator that yields one QuantumTensor per
batch."""
- _validate_loader_args(
- device_id=self._device_id,
- num_qubits=self._num_qubits,
- batch_size=self._batch_size,
- total_batches=self._total_batches,
- encoding_method=self._encoding_method,
- seed=self._seed,
- )
+ def _create_iterator(self) -> Iterator[object]:
+ """Build engine and return the Rust-backed loader iterator (synthetic
or file)."""
+ if self._synthetic_requested and self._file_requested:
+ raise ValueError(
+ "Cannot set both synthetic and file sources; use either
.source_synthetic() or .source_file(path), not both."
+ )
+ if self._file_requested and self._file_path is None:
+ raise ValueError(
+ "source_file() was not called with a path; set file source
with .source_file(path)."
+ )
+ use_synthetic = not self._file_requested
+ if use_synthetic:
+ _validate_loader_args(
+ device_id=self._device_id,
+ num_qubits=self._num_qubits,
+ batch_size=self._batch_size,
+ total_batches=self._total_batches,
+ encoding_method=self._encoding_method,
+ seed=self._seed,
+ )
qdp = _get_qdp()
QdpEngine = getattr(qdp, "QdpEngine", None)
if QdpEngine is None:
@@ -187,16 +217,43 @@ class QuantumDataLoader:
"_qdp.QdpEngine not found. Build the extension with maturin
develop."
)
engine = QdpEngine(device_id=self._device_id)
+ if not use_synthetic:
+ if self._streaming_requested:
+ create_loader = getattr(engine,
"create_streaming_file_loader", None)
+ if create_loader is None:
+ raise RuntimeError(
+ "create_streaming_file_loader not available (e.g. only
on Linux with CUDA)."
+ )
+ else:
+ create_loader = getattr(engine, "create_file_loader", None)
+ if create_loader is None:
+ raise RuntimeError(
+ "create_file_loader not available (e.g. only on Linux
with CUDA)."
+ )
+ return iter(
+ create_loader(
+ path=self._file_path,
+ batch_size=self._batch_size,
+ num_qubits=self._num_qubits,
+ encoding_method=self._encoding_method,
+ batch_limit=None,
+ )
+ )
create_synthetic_loader = getattr(engine, "create_synthetic_loader",
None)
if create_synthetic_loader is None:
raise RuntimeError(
"create_synthetic_loader not available (e.g. only on Linux
with CUDA)."
)
- loader = create_synthetic_loader(
- total_batches=self._total_batches,
- batch_size=self._batch_size,
- num_qubits=self._num_qubits,
- encoding_method=self._encoding_method,
- seed=self._seed,
+ return iter(
+ create_synthetic_loader(
+ total_batches=self._total_batches,
+ batch_size=self._batch_size,
+ num_qubits=self._num_qubits,
+ encoding_method=self._encoding_method,
+ seed=self._seed,
+ )
)
- return iter(loader)
+
+ def __iter__(self) -> Iterator[object]:
+ """Return Rust-backed iterator that yields one QuantumTensor per
batch."""
+ return self._create_iterator()
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index ff1083f97..7afea5284 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -15,21 +15,13 @@
// limitations under the License.
use numpy::{PyReadonlyArray1, PyReadonlyArray2, PyUntypedArrayMethods};
-use pyo3::exceptions::{PyRuntimeError, PyStopIteration};
+use pyo3::exceptions::PyRuntimeError;
use pyo3::ffi;
use pyo3::prelude::*;
use qdp_core::dlpack::{DL_FLOAT, DLDeviceType, DLManagedTensor};
use qdp_core::{Precision, QdpEngine as CoreEngine};
use std::ffi::c_void;
-#[cfg(target_os = "linux")]
-use qdp_core::{PipelineConfig, PipelineIterator, PipelineRunResult,
run_throughput_pipeline};
-
-/// Wraps raw DLPack pointer so it can cross `py.allow_threads` (closure
return must be `Send`).
-/// Safe: DLPack pointer handover across contexts; GIL is released only during
the closure.
-struct SendPtr(pub *mut DLManagedTensor);
-unsafe impl Send for SendPtr {}
-
/// Quantum tensor wrapper implementing DLPack protocol
///
/// This class wraps a GPU-allocated quantum state vector and implements
@@ -157,74 +149,6 @@ impl Drop for QuantumTensor {
unsafe impl Send for QuantumTensor {}
unsafe impl Sync for QuantumTensor {}
-/// Python iterator yielding one QuantumTensor (batch) per __next__. Releases
GIL during next_batch().
-#[cfg(target_os = "linux")]
-#[pyclass]
-struct PyQuantumLoader {
- inner: Option<PipelineIterator>,
-}
-
-#[cfg(target_os = "linux")]
-#[pymethods]
-impl PyQuantumLoader {
- fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
- slf
- }
-
- /// Returns the next batch as QuantumTensor; raises StopIteration when
exhausted. Releases GIL during encode.
- fn __next__(mut slf: PyRefMut<'_, Self>, py: Python<'_>) ->
PyResult<QuantumTensor> {
- let mut inner_iter = match slf.inner.take() {
- Some(it) => it,
- None => return Err(PyStopIteration::new_err("loader exhausted")),
- };
-
- #[allow(deprecated)]
- let result = py.allow_threads(move || {
- let res = inner_iter.next_batch();
- match res {
- Ok(Some(ptr)) => Ok((inner_iter, Some(SendPtr(ptr)))),
- Ok(None) => Ok((inner_iter, None)),
- Err(e) => Err((inner_iter, e)),
- }
- });
-
- match result {
- Ok((returned_iter, Some(send_ptr))) => {
- slf.inner = Some(returned_iter);
- Ok(QuantumTensor {
- ptr: send_ptr.0,
- consumed: false,
- })
- }
- Ok((_, None)) => Err(PyStopIteration::new_err("loader exhausted")),
- Err((returned_iter, e)) => {
- slf.inner = Some(returned_iter);
- Err(PyRuntimeError::new_err(e.to_string()))
- }
- }
- }
-}
-
-/// Stub PyQuantumLoader when not on Linux (CUDA pipeline not available).
-#[cfg(not(target_os = "linux"))]
-#[pyclass]
-struct PyQuantumLoader {}
-
-#[cfg(not(target_os = "linux"))]
-#[pymethods]
-impl PyQuantumLoader {
- fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
- slf
- }
-
- fn __next__(&self, _py: Python<'_>) -> PyResult<QuantumTensor> {
- Err(PyRuntimeError::new_err(
- "QuantumDataLoader is only available on Linux (CUDA pipeline). \
- Build and run from a Linux host with CUDA.",
- ))
- }
-}
-
/// Helper to detect PyTorch tensor
fn is_pytorch_tensor(obj: &Bound<'_, PyAny>) -> PyResult<bool> {
let type_obj = obj.get_type();
@@ -340,9 +264,12 @@ fn get_torch_cuda_stream_ptr(tensor: &Bound<'_, PyAny>) ->
PyResult<*mut c_void>
)));
}
- // PyTorch default stream can report cuda_stream as 0; treat as valid
(Rust sync is no-op for null).
let stream_ptr: u64 = stream.getattr("cuda_stream")?.extract()?;
- Ok(stream_ptr as *mut c_void)
+ Ok(if stream_ptr == 0 {
+ std::ptr::null_mut()
+ } else {
+ stream_ptr as *mut c_void
+ })
}
/// Validate a CUDA tensor for direct GPU encoding
@@ -1123,22 +1050,10 @@ impl QdpEngine {
})
}
- /// Create a synthetic-data loader iterator for use in Python `for qt in
loader`.
- ///
- /// Yields one QuantumTensor (batch) per iteration; releases GIL during
encode.
- /// Use with QuantumDataLoader builder or directly for streaming encode.
- ///
- /// Args:
- /// total_batches: Number of batches to yield
- /// batch_size: Samples per batch
- /// num_qubits: Qubits per sample
- /// encoding_method: "amplitude", "angle", or "basis"
- /// seed: Optional RNG seed for reproducible synthetic data
- ///
- /// Returns:
- /// PyQuantumLoader: iterator yielding QuantumTensor per __next__
+ // --- Loader factory methods (Linux only) ---
#[cfg(target_os = "linux")]
- #[pyo3(signature = (total_batches, batch_size=64, num_qubits=16,
encoding_method="amplitude", seed=None))]
+ /// Create a synthetic-data pipeline iterator (for
QuantumDataLoader.source_synthetic()).
+ #[pyo3(signature = (total_batches, batch_size, num_qubits,
encoding_method, seed=None))]
fn create_synthetic_loader(
&self,
total_batches: usize,
@@ -1147,168 +1062,193 @@ impl QdpEngine {
encoding_method: &str,
seed: Option<u64>,
) -> PyResult<PyQuantumLoader> {
- let config = PipelineConfig {
- device_id: self.engine.device().ordinal(),
- num_qubits,
+ let config = config_from_args(
+ &self.engine,
batch_size,
+ num_qubits,
+ encoding_method,
total_batches,
- encoding_method: encoding_method.to_string(),
seed,
- warmup_batches: 0,
- };
- let iter = PipelineIterator::new_synthetic(self.engine.clone(), config)
- .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
- Ok(PyQuantumLoader { inner: Some(iter) })
+ );
+ let iter =
qdp_core::PipelineIterator::new_synthetic(self.engine.clone(), config).map_err(
+ |e| PyRuntimeError::new_err(format!("create_synthetic_loader
failed: {}", e)),
+ )?;
+ Ok(PyQuantumLoader::new(Some(iter)))
}
- /// Stub when not on Linux: create_synthetic_loader is only implemented on
Linux.
- #[cfg(not(target_os = "linux"))]
- #[pyo3(signature = (total_batches, batch_size=64, num_qubits=16,
encoding_method="amplitude", seed=None))]
- fn create_synthetic_loader(
+ #[cfg(target_os = "linux")]
+ /// Create a file-backed pipeline iterator (full read then batch; for
QuantumDataLoader.source_file(path)).
+ #[pyo3(signature = (path, batch_size, num_qubits, encoding_method,
batch_limit=None))]
+ fn create_file_loader(
&self,
- total_batches: usize,
+ py: Python<'_>,
+ path: &Bound<'_, PyAny>,
batch_size: usize,
num_qubits: u32,
encoding_method: &str,
- seed: Option<u64>,
+ batch_limit: Option<usize>,
) -> PyResult<PyQuantumLoader> {
- let _ = (total_batches, batch_size, num_qubits, encoding_method, seed);
- Err(PyRuntimeError::new_err(
- "create_synthetic_loader is only available on Linux (CUDA
pipeline). \
- Build and run from a Linux host with CUDA.",
- ))
+ let path_str = path_from_py(path)?;
+ let batch_limit = batch_limit.unwrap_or(usize::MAX);
+ let config = config_from_args(
+ &self.engine,
+ batch_size,
+ num_qubits,
+ encoding_method,
+ 0,
+ None,
+ );
+ let engine = self.engine.clone();
+ let iter = py
+ .detach(|| {
+ qdp_core::PipelineIterator::new_from_file(
+ engine,
+ path_str.as_str(),
+ config,
+ batch_limit,
+ )
+ })
+ .map_err(|e| PyRuntimeError::new_err(format!("create_file_loader
failed: {}", e)))?;
+ Ok(PyQuantumLoader::new(Some(iter)))
}
- /// Run dual-stream pipeline for encoding (H2D + kernel overlap). Internal
API.
- ///
- /// Exposes run_dual_stream_pipeline from qdp-core. Accepts 1D host data
(single sample).
- /// Does not return a tensor; use for throughput measurement or when state
is not needed.
- /// Currently supports amplitude encoding only.
- ///
- /// Args:
- /// host_data: 1D input (list or NumPy array, float64)
- /// num_qubits: Number of qubits
- /// encoding_method: "amplitude" (other methods not yet supported for
this path)
#[cfg(target_os = "linux")]
- fn _encode_stream_internal(
+ /// Create a streaming Parquet pipeline iterator (for
QuantumDataLoader.source_file(path, streaming=True)).
+ #[pyo3(signature = (path, batch_size, num_qubits, encoding_method,
batch_limit=None))]
+ fn create_streaming_file_loader(
&self,
- host_data: &Bound<'_, PyAny>,
- num_qubits: usize,
+ py: Python<'_>,
+ path: &Bound<'_, PyAny>,
+ batch_size: usize,
+ num_qubits: u32,
encoding_method: &str,
- ) -> PyResult<()> {
- let data_slice: Vec<f64> = if
host_data.hasattr("__array_interface__")? {
- let array_1d =
host_data.extract::<PyReadonlyArray1<f64>>().map_err(|_| {
- PyRuntimeError::new_err("host_data must be 1D NumPy array with
dtype float64")
+ batch_limit: Option<usize>,
+ ) -> PyResult<PyQuantumLoader> {
+ let path_str = path_from_py(path)?;
+ let batch_limit = batch_limit.unwrap_or(usize::MAX);
+ let config = config_from_args(
+ &self.engine,
+ batch_size,
+ num_qubits,
+ encoding_method,
+ 0,
+ None,
+ );
+ let engine = self.engine.clone();
+ let iter = py
+ .detach(|| {
+ qdp_core::PipelineIterator::new_from_file_streaming(
+ engine,
+ path_str.as_str(),
+ config,
+ batch_limit,
+ )
+ })
+ .map_err(|e| {
+ PyRuntimeError::new_err(format!("create_streaming_file_loader
failed: {}", e))
})?;
- array_1d
- .as_slice()
- .map_err(|_| PyRuntimeError::new_err("NumPy array must be
contiguous (C-order)"))?
- .to_vec()
- } else {
- host_data.extract::<Vec<f64>>().map_err(|_| {
- PyRuntimeError::new_err("host_data must be 1D list/array of
float64")
- })?
- };
- self.engine
- .run_dual_stream_encode(&data_slice, num_qubits, encoding_method)
- .map_err(|e|
PyRuntimeError::new_err(format!("run_dual_stream_encode failed: {}", e)))
+ Ok(PyQuantumLoader::new(Some(iter)))
}
}
-/// Runs the full throughput pipeline in Rust with GIL released. Returns
(duration_sec, vectors_per_sec, latency_ms_per_vector).
+// --- Loader bindings (Linux only; qdp-core pipeline types only built on
Linux) ---
#[cfg(target_os = "linux")]
-#[pyfunction]
-#[pyo3(signature = (device_id=0, num_qubits=16, batch_size=64,
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None))]
-#[allow(clippy::too_many_arguments)]
-fn run_throughput_pipeline_py_impl(
- py: Python<'_>,
- device_id: usize,
- num_qubits: u32,
- batch_size: usize,
- total_batches: usize,
- encoding_method: &str,
- warmup_batches: usize,
- seed: Option<u64>,
-) -> PyResult<(f64, f64, f64)> {
- let encoding_method = encoding_method.to_string();
- #[allow(deprecated)]
- let result: Result<PipelineRunResult, qdp_core::MahoutError> =
py.allow_threads(move || {
- let config = PipelineConfig {
- device_id,
+mod loader_bindings {
+ use super::*;
+ use pyo3::exceptions::PyStopIteration;
+ use qdp_core::{PipelineConfig, PipelineIterator};
+
+ /// Rust-backed iterator yielding one QuantumTensor per batch; used by
QuantumDataLoader.
+ #[pyclass]
+ pub struct PyQuantumLoader {
+ inner: Option<PipelineIterator>,
+ }
+
+ impl PyQuantumLoader {
+ pub fn new(inner: Option<PipelineIterator>) -> Self {
+ Self { inner }
+ }
+ }
+
+ #[pymethods]
+ impl PyQuantumLoader {
+ fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+ slf
+ }
+
+ fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<QuantumTensor> {
+ let mut iter: PipelineIterator = match slf.inner.take() {
+ Some(i) => i,
+ None => return Err(PyStopIteration::new_err("")),
+ };
+ // Call next_batch without releasing GIL (return type *mut
DLManagedTensor is !Send).
+ let result = iter.next_batch();
+ match result {
+ Ok(Some(ptr)) => {
+ slf.inner = Some(iter);
+ Ok(QuantumTensor {
+ ptr,
+ consumed: false,
+ })
+ }
+ Ok(None) => {
+ // Exhausted; do not put iterator back
+ Err(PyStopIteration::new_err(""))
+ }
+ Err(e) => {
+ slf.inner = Some(iter);
+ Err(PyRuntimeError::new_err(format!(
+ "Pipeline next_batch failed: {}",
+ e
+ )))
+ }
+ }
+ }
+ }
+
+ /// Build PipelineConfig from Python args. device_id is 0 (engine does not
expose it); iterator uses engine clone with correct device.
+ pub fn config_from_args(
+ _engine: &CoreEngine,
+ batch_size: usize,
+ num_qubits: u32,
+ encoding_method: &str,
+ total_batches: usize,
+ seed: Option<u64>,
+ ) -> PipelineConfig {
+ PipelineConfig {
+ device_id: 0,
num_qubits,
batch_size,
total_batches,
- encoding_method,
+ encoding_method: encoding_method.to_string(),
seed,
- warmup_batches,
- };
- run_throughput_pipeline(&config)
- });
- let res = result.map_err(|e: qdp_core::MahoutError|
PyRuntimeError::new_err(e.to_string()))?;
- Ok((
- res.duration_sec,
- res.vectors_per_sec,
- res.latency_ms_per_vector,
- ))
-}
+ warmup_batches: 0,
+ }
+ }
-/// Stub when not on Linux: run_throughput_pipeline_py is only implemented on
Linux (CUDA pipeline).
-#[cfg(not(target_os = "linux"))]
-#[pyfunction]
-#[pyo3(signature = (device_id=0, num_qubits=16, batch_size=64,
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None))]
-fn run_throughput_pipeline_py_impl(
- _py: Python<'_>,
- _device_id: usize,
- _num_qubits: u32,
- _batch_size: usize,
- _total_batches: usize,
- _encoding_method: &str,
- _warmup_batches: usize,
- _seed: Option<u64>,
-) -> PyResult<(f64, f64, f64)> {
- Err(PyRuntimeError::new_err(
- "run_throughput_pipeline_py is only available on Linux (CUDA
pipeline). \
- Build and run from a Linux host with CUDA.",
- ))
+ /// Resolve path from Python str or pathlib.Path (__fspath__).
+ pub fn path_from_py(path: &Bound<'_, PyAny>) -> PyResult<String> {
+ path.extract::<String>().or_else(|_| {
+ path.call_method0("__fspath__")
+ .and_then(|m| m.extract::<String>())
+ })
+ }
}
-/// Public wrapper so the same name is always present in the module (import
never fails).
-#[pyfunction]
-#[pyo3(signature = (device_id=0, num_qubits=16, batch_size=64,
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None))]
-#[allow(clippy::too_many_arguments)]
-fn run_throughput_pipeline_py(
- py: Python<'_>,
- device_id: usize,
- num_qubits: u32,
- batch_size: usize,
- total_batches: usize,
- encoding_method: &str,
- warmup_batches: usize,
- seed: Option<u64>,
-) -> PyResult<(f64, f64, f64)> {
- run_throughput_pipeline_py_impl(
- py,
- device_id,
- num_qubits,
- batch_size,
- total_batches,
- encoding_method,
- warmup_batches,
- seed,
- )
-}
+#[cfg(target_os = "linux")]
+use loader_bindings::{PyQuantumLoader, config_from_args, path_from_py};
/// Quantum Data Plane (QDP) Python module
///
/// GPU-accelerated quantum data encoding with DLPack integration.
#[pymodule]
fn _qdp(m: &Bound<'_, PyModule>) -> PyResult<()> {
- // Respect RUST_LOG; try_init() is idempotent if already initialized
+ // Respect RUST_LOG for Rust log output; try_init() is no-op if already
initialized.
let _ = env_logger::Builder::from_default_env().try_init();
m.add_class::<QdpEngine>()?;
m.add_class::<QuantumTensor>()?;
+ #[cfg(target_os = "linux")]
m.add_class::<PyQuantumLoader>()?;
- m.add_function(pyo3::wrap_pyfunction!(run_throughput_pipeline_py, m)?)?;
Ok(())
}
diff --git a/qdp/qdp-python/tests/test_dlpack_validation.py
b/qdp/qdp-python/tests/test_dlpack_validation.py
index 84335c94a..ce45bdf92 100644
--- a/qdp/qdp-python/tests/test_dlpack_validation.py
+++ b/qdp/qdp-python/tests/test_dlpack_validation.py
@@ -47,27 +47,34 @@ def test_dtype_validation_float32_rejected():
engine.encode(t, num_qubits=2, encoding_method="amplitude")
msg = str(exc_info.value).lower()
assert "float64" in msg
- assert "code=" in msg or "bits=" in msg or "lanes=" in msg
+ # Accept either DLPack-style (code=/bits=/lanes=) or user-facing
(float32/dtype) message
+ assert (
+ "code=" in msg
+ or "bits=" in msg
+ or "lanes=" in msg
+ or "float32" in msg
+ or "dtype" in msg
+ )
@pytest.mark.skipif(not _cuda_available(), reason="CUDA not available")
def test_stride_1d_non_contiguous_rejected():
- """Non-contiguous 1D CUDA tensor (stride != 1) should fail with actual vs
expected."""
+ """Non-contiguous 1D CUDA tensor (stride != 1) should fail with contiguous
requirement."""
engine = _engine()
# Slice so stride is 2: shape (2,), stride (2,)
t = torch.randn(4, dtype=torch.float64, device="cuda")[::2]
assert t.stride(0) != 1
with pytest.raises(RuntimeError) as exc_info:
engine.encode(t, num_qubits=1, encoding_method="amplitude")
- msg = str(exc_info.value)
- assert "contiguous" in msg.lower()
- assert "stride[0]=" in msg
- assert "expected 1" in msg or "expected 1 " in msg
+ msg = str(exc_info.value).lower()
+ assert "contiguous" in msg
+ # Accept either explicit stride[0]/expected or user-facing contiguous()
hint
+ assert "stride" in msg or "contiguous()" in msg or "expected" in msg
@pytest.mark.skipif(not _cuda_available(), reason="CUDA not available")
def test_stride_2d_non_contiguous_rejected():
- """Non-contiguous 2D CUDA tensor should fail with actual vs expected
strides."""
+ """Non-contiguous 2D CUDA tensor should fail with contiguous
requirement."""
engine = _engine()
# (4, 2) with strides (3, 2) -> not C-contiguous; expected for (4,2) is
(2, 1)
t = torch.randn(4, 3, dtype=torch.float64, device="cuda")[:, ::2]
@@ -76,10 +83,10 @@ def test_stride_2d_non_contiguous_rejected():
assert t.stride(0) == 3 and t.stride(1) == 2
with pytest.raises(RuntimeError) as exc_info:
engine.encode(t, num_qubits=1, encoding_method="amplitude")
- msg = str(exc_info.value)
- assert "contiguous" in msg.lower()
- assert "strides=" in msg
- assert "expected" in msg
+ msg = str(exc_info.value).lower()
+ assert "contiguous" in msg
+ # Accept either explicit strides=/expected or user-facing contiguous() hint
+ assert "stride" in msg or "contiguous()" in msg or "expected" in msg
@pytest.mark.skipif(not _cuda_available(), reason="CUDA not available")
diff --git a/qdp/qdp-python/tests/test_quantum_data_loader.py
b/qdp/qdp-python/tests/test_quantum_data_loader.py
new file mode 100644
index 000000000..5d5fb2005
--- /dev/null
+++ b/qdp/qdp-python/tests/test_quantum_data_loader.py
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""tests for Quantum Data Loader."""
+
+import pytest
+
+try:
+ from qumat_qdp.loader import QuantumDataLoader
+except ImportError:
+ QuantumDataLoader = None # type: ignore[assignment,misc]
+
+
+def _loader_available():
+ return QuantumDataLoader is not None
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not
available")
+def test_mutual_exclusion_both_sources_raises():
+ """Calling both .source_synthetic() and .source_file() then __iter__
raises ValueError."""
+ loader = (
+ QuantumDataLoader(device_id=0)
+ .qubits(4)
+ .batches(10, size=4)
+ .source_synthetic()
+ .source_file("/tmp/any.parquet")
+ )
+ with pytest.raises(ValueError) as exc_info:
+ list(loader)
+ msg = str(exc_info.value)
+ assert "Cannot set both synthetic and file sources" in msg
+ assert "source_synthetic" in msg and "source_file" in msg
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not
available")
+def test_mutual_exclusion_exact_message():
+ """ValueError when both sources set: message mentions source_synthetic and
source_file."""
+ loader = (
+ QuantumDataLoader(device_id=0)
+ .qubits(4)
+ .batches(10, size=4)
+ .source_file("/tmp/x.npy")
+ .source_synthetic()
+ )
+ with pytest.raises(ValueError) as exc_info:
+ list(loader)
+ assert "Cannot set both synthetic and file sources" in str(exc_info.value)
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not
available")
+def test_source_file_empty_path_raises():
+ """source_file() with empty path raises ValueError."""
+ loader = QuantumDataLoader(device_id=0).qubits(4).batches(10, size=4)
+ with pytest.raises(ValueError) as exc_info:
+ loader.source_file("")
+ assert "path" in str(exc_info.value).lower()
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not
available")
+def test_synthetic_loader_batch_count():
+ """Synthetic loader yields exactly total_batches batches."""
+ total = 5
+ batch_size = 4
+ loader = (
+ QuantumDataLoader(device_id=0)
+ .qubits(4)
+ .batches(total, size=batch_size)
+ .source_synthetic()
+ )
+ try:
+ batches = list(loader)
+ except RuntimeError as e:
+ if "only available on Linux" in str(e) or "not available" in str(e):
+ pytest.skip("CUDA/Linux required for loader iteration")
+ raise
+ assert len(batches) == total
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not
available")
+def test_file_loader_unsupported_extension_raises():
+ """source_file with unsupported extension raises at __iter__."""
+ loader = (
+ QuantumDataLoader(device_id=0)
+ .qubits(4)
+ .batches(10, size=4)
+ .source_file("/tmp/data.unsupported")
+ )
+ try:
+ list(loader)
+ except RuntimeError as e:
+ msg = str(e).lower()
+ if "not available" in msg:
+ pytest.skip(
+ "create_file_loader not available (e.g. extension built
without loader)"
+ )
+ return
+ assert "unsupported" in msg or "extension" in msg or "supported" in msg
+ return
+ except ValueError:
+ pytest.skip("Loader may validate path before Rust")
+ return
+ pytest.fail("Expected RuntimeError for unsupported file extension")
+
+
+# --- Streaming (source_file(..., streaming=True)) tests ---
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not
available")
+def test_streaming_requires_parquet():
+ """source_file(path, streaming=True) with non-.parquet path raises
ValueError."""
+ with pytest.raises(ValueError) as exc_info:
+ QuantumDataLoader(device_id=0).qubits(4).batches(10,
size=4).source_file(
+ "/tmp/data.npy", streaming=True
+ )
+ msg = str(exc_info.value).lower()
+ assert "parquet" in msg or "streaming" in msg
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not
available")
+def test_streaming_parquet_extension_ok():
+ """source_file(path, streaming=True) with .parquet path does not raise at
builder."""
+ loader = (
+ QuantumDataLoader(device_id=0)
+ .qubits(4)
+ .batches(10, size=4)
+ .source_file("/tmp/data.parquet", streaming=True)
+ )
+ # Iteration may raise RuntimeError (no CUDA) or fail on missing file; we
only check builder accepts.
+ assert loader._streaming_requested is True
+ assert loader._file_path == "/tmp/data.parquet"