This is an automated email from the ASF dual-hosted git repository.

guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ee09afe7 [QDP] File and streaming Quantum Data Loader (#1010)
7ee09afe7 is described below

commit 7ee09afe788a99333f40f1e67bf806981bb9cf80
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Sat Feb 7 22:45:48 2026 +0800

    [QDP] File and streaming Quantum Data Loader (#1010)
    
    * [QDP] File and streaming Quantum Data Loader
    
    * refactor and update
---
 qdp/qdp-core/src/pipeline_runner.rs              | 330 +++++++++++++++++++-
 qdp/qdp-python/qumat_qdp/loader.py               | 101 ++++--
 qdp/qdp-python/src/lib.rs                        | 378 ++++++++++-------------
 qdp/qdp-python/tests/test_dlpack_validation.py   |  29 +-
 qdp/qdp-python/tests/test_quantum_data_loader.py | 143 +++++++++
 5 files changed, 717 insertions(+), 264 deletions(-)

diff --git a/qdp/qdp-core/src/pipeline_runner.rs 
b/qdp/qdp-core/src/pipeline_runner.rs
index 9952e26ad..93d577b4b 100644
--- a/qdp/qdp-core/src/pipeline_runner.rs
+++ b/qdp/qdp-core/src/pipeline_runner.rs
@@ -18,11 +18,16 @@
 // Python bindings release GIL during the run.
 
 use std::f64::consts::PI;
+use std::path::Path;
+use std::sync::Mutex;
 use std::time::Instant;
 
 use crate::QdpEngine;
 use crate::dlpack::DLManagedTensor;
-use crate::error::Result;
+use crate::error::{MahoutError, Result};
+use crate::io;
+use crate::reader::StreamingDataReader;
+use crate::readers::ParquetStreamingReader;
 
 /// Configuration for throughput/latency pipeline runs (Python 
run_throughput_pipeline_py).
 #[derive(Clone, Debug)]
@@ -58,14 +63,111 @@ pub struct PipelineRunResult {
     pub latency_ms_per_vector: f64,
 }
 
-/// Data source for the pipeline iterator (Phase 1: Synthetic only; Phase 2: 
File).
-#[derive(Debug)]
+/// Data source for the pipeline iterator (Phase 1: Synthetic; Phase 2a: 
InMemory; Phase 2b: Streaming).
 pub enum DataSource {
     Synthetic {
         seed: u64,
         batch_index: usize,
         total_batches: usize,
     },
+    /// Phase 2a: full file loaded once; iterator slices by batch_size.
+    InMemory {
+        data: Vec<f64>,
+        cursor: usize,
+        num_samples: usize,
+        sample_size: usize,
+        batches_yielded: usize,
+        batch_limit: usize,
+    },
+    /// Phase 2b: stream from Parquet in chunks; iterator refills buffer and 
encodes by batch.
+    /// Reader is in Mutex so PipelineIterator remains Sync (required by PyO3 
pyclass).
+    Streaming {
+        reader: Mutex<ParquetStreamingReader>,
+        buffer: Vec<f64>,
+        buffer_cursor: usize,
+        read_chunk_scratch: Vec<f64>,
+        sample_size: usize,
+        batch_limit: usize,
+        batches_yielded: usize,
+    },
+}
+
+impl std::fmt::Debug for DataSource {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            DataSource::Synthetic {
+                seed,
+                batch_index,
+                total_batches,
+            } => f
+                .debug_struct("Synthetic")
+                .field("seed", seed)
+                .field("batch_index", batch_index)
+                .field("total_batches", total_batches)
+                .finish(),
+            DataSource::InMemory {
+                cursor,
+                num_samples,
+                sample_size,
+                batches_yielded,
+                batch_limit,
+                ..
+            } => f
+                .debug_struct("InMemory")
+                .field("cursor", cursor)
+                .field("num_samples", num_samples)
+                .field("sample_size", sample_size)
+                .field("batches_yielded", batches_yielded)
+                .field("batch_limit", batch_limit)
+                .finish(),
+            DataSource::Streaming {
+                buffer,
+                buffer_cursor,
+                sample_size,
+                batch_limit,
+                batches_yielded,
+                ..
+            } => f
+                .debug_struct("Streaming")
+                .field("buffer_len", &buffer.len())
+                .field("buffer_cursor", buffer_cursor)
+                .field("sample_size", sample_size)
+                .field("batch_limit", batch_limit)
+                .field("batches_yielded", batches_yielded)
+                .finish(),
+        }
+    }
+}
+
+/// Default Parquet row group size for streaming reader (tunable).
+const DEFAULT_PARQUET_ROW_GROUP_SIZE: usize = 2048;
+
+/// When buffer_cursor >= buffer.len() / BUFFER_COMPACT_DENOM, compact by 
draining consumed prefix.
+const BUFFER_COMPACT_DENOM: usize = 2;
+
+/// Returns the path extension as lowercase ASCII (e.g. "parquet"), or None if 
missing/non-UTF8.
+fn path_extension_lower(path: &Path) -> Option<String> {
+    path.extension()
+        .and_then(|e| e.to_str())
+        .map(|s| s.to_lowercase())
+}
+
+/// Dispatches by path extension to the appropriate io reader. Returns (data, 
num_samples, sample_size).
+/// Unsupported or missing extension returns Err with message listing 
supported formats.
+fn read_file_by_extension(path: &Path) -> Result<(Vec<f64>, usize, usize)> {
+    let ext_lower = path_extension_lower(path);
+    let ext = ext_lower.as_deref();
+    match ext {
+        Some("parquet") => io::read_parquet_batch(path),
+        Some("arrow") | Some("feather") | Some("ipc") => 
io::read_arrow_ipc_batch(path),
+        Some("npy") => io::read_numpy_batch(path),
+        Some("pt") | Some("pth") => io::read_torch_batch(path),
+        Some("pb") => io::read_tensorflow_batch(path),
+        _ => Err(MahoutError::InvalidInput(format!(
+            "Unsupported file extension {:?}. Supported: .parquet, .arrow, 
.feather, .ipc, .npy, .pt, .pth, .pb",
+            path.extension()
+        ))),
+    }
 }
 
 /// Stateful iterator that yields one batch DLPack at a time for Python `for` 
loop consumption.
@@ -77,6 +179,9 @@ pub struct PipelineIterator {
     vector_len: usize,
 }
 
+/// (batch_data, batch_n, sample_size, num_qubits) from one source pull.
+type BatchFromSource = (Vec<f64>, usize, usize, usize);
+
 impl PipelineIterator {
     /// Create a new synthetic-data pipeline iterator.
     pub fn new_synthetic(engine: QdpEngine, config: PipelineConfig) -> 
Result<Self> {
@@ -94,26 +199,227 @@ impl PipelineIterator {
         })
     }
 
-    /// Returns the next batch as a DLPack pointer; `Ok(None)` when exhausted.
-    pub fn next_batch(&mut self) -> Result<Option<*mut DLManagedTensor>> {
-        let (batch_data, num_qubits) = match &mut self.source {
+    /// Create a pipeline iterator from a file (Phase 2a: load full file then 
slice by batch).
+    /// Dispatches by path extension; validates dimensions at construction.
+    ///
+    /// Supported extensions: .parquet, .arrow, .feather, .ipc, .npy, .pt, 
.pth, .pb.
+    /// For file source, `batch_limit` caps batches yielded (e.g. for 
testing); use `usize::MAX` to iterate until EOF.
+    pub fn new_from_file<P: AsRef<Path>>(
+        engine: QdpEngine,
+        path: P,
+        config: PipelineConfig,
+        batch_limit: usize,
+    ) -> Result<Self> {
+        let path = path.as_ref();
+        let (data, num_samples, sample_size) = read_file_by_extension(path)?;
+        let vector_len = vector_len(config.num_qubits, 
&config.encoding_method);
+
+        // Dimension validation at construction.
+        if sample_size != vector_len {
+            return Err(MahoutError::InvalidInput(format!(
+                "File feature length {} does not match vector_len {} for 
num_qubits={}, encoding={}",
+                sample_size, vector_len, config.num_qubits, 
config.encoding_method
+            )));
+        }
+        if data.len() != num_samples * sample_size {
+            return Err(MahoutError::InvalidInput(format!(
+                "File data length {} is not num_samples ({}) * sample_size 
({})",
+                data.len(),
+                num_samples,
+                sample_size
+            )));
+        }
+
+        let source = DataSource::InMemory {
+            data,
+            cursor: 0,
+            num_samples,
+            sample_size,
+            batches_yielded: 0,
+            batch_limit,
+        };
+        Ok(Self {
+            engine,
+            config,
+            source,
+            vector_len,
+        })
+    }
+
+    /// Create a pipeline iterator from a Parquet file using streaming read 
(Phase 2b).
+    /// Only `.parquet` is supported; reduces memory for large files by 
reading in chunks.
+    /// Validates sample_size == vector_len after the first chunk.
+    pub fn new_from_file_streaming<P: AsRef<Path>>(
+        engine: QdpEngine,
+        path: P,
+        config: PipelineConfig,
+        batch_limit: usize,
+    ) -> Result<Self> {
+        let path = path.as_ref();
+        if path_extension_lower(path).as_deref() != Some("parquet") {
+            return Err(MahoutError::InvalidInput(format!(
+                "Streaming file loader supports only .parquet; got extension 
{:?}. Use .source_file(path) for other formats.",
+                path.extension()
+            )));
+        }
+
+        let mut reader = ParquetStreamingReader::new(path, 
Some(DEFAULT_PARQUET_ROW_GROUP_SIZE))?;
+        let vector_len = vector_len(config.num_qubits, 
&config.encoding_method);
+
+        // Read first chunk to learn sample_size; reuse as initial buffer.
+        const INITIAL_CHUNK_CAP: usize = 64 * 1024;
+        let mut buffer = vec![0.0; INITIAL_CHUNK_CAP];
+        let written = reader.read_chunk(&mut buffer)?;
+        if written == 0 {
+            return Err(MahoutError::InvalidInput(
+                "Parquet file is empty or contains no data.".to_string(),
+            ));
+        }
+        let sample_size = reader.get_sample_size().ok_or_else(|| {
+            MahoutError::InvalidInput(
+                "Parquet streaming reader did not set sample_size after first 
chunk.".to_string(),
+            )
+        })?;
+
+        if sample_size != vector_len {
+            return Err(MahoutError::InvalidInput(format!(
+                "File feature length {} does not match vector_len {} for 
num_qubits={}, encoding={}",
+                sample_size, vector_len, config.num_qubits, 
config.encoding_method
+            )));
+        }
+
+        buffer.truncate(written);
+        let read_chunk_scratch = vec![0.0; INITIAL_CHUNK_CAP];
+
+        let source = DataSource::Streaming {
+            reader: Mutex::new(reader),
+            buffer,
+            buffer_cursor: 0,
+            read_chunk_scratch,
+            sample_size,
+            batch_limit,
+            batches_yielded: 0,
+        };
+        Ok(Self {
+            engine,
+            config,
+            source,
+            vector_len,
+        })
+    }
+
+    /// Yields the next batch data from the current source; `None` when 
exhausted.
+    /// Returns (batch_data, batch_n, sample_size, num_qubits).
+    fn take_batch_from_source(&mut self) -> Result<Option<BatchFromSource>> {
+        Ok(match &mut self.source {
             DataSource::Synthetic {
                 batch_index,
                 total_batches,
                 ..
             } => {
                 if *batch_index >= *total_batches {
-                    return Ok(None);
+                    None
+                } else {
+                    let data = generate_batch(&self.config, *batch_index, 
self.vector_len);
+                    *batch_index += 1;
+                    Some((
+                        data,
+                        self.config.batch_size,
+                        self.vector_len,
+                        self.config.num_qubits as usize,
+                    ))
+                }
+            }
+            DataSource::InMemory {
+                data,
+                cursor,
+                sample_size,
+                batches_yielded,
+                batch_limit,
+                ..
+            } => {
+                if *batches_yielded >= *batch_limit {
+                    None
+                } else {
+                    let remaining = (data.len() - *cursor) / *sample_size;
+                    if remaining == 0 {
+                        None
+                    } else {
+                        let batch_n = remaining.min(self.config.batch_size);
+                        let start = *cursor;
+                        let end = start + batch_n * *sample_size;
+                        *cursor = end;
+                        *batches_yielded += 1;
+                        let slice = data[start..end].to_vec();
+                        Some((
+                            slice,
+                            batch_n,
+                            *sample_size,
+                            self.config.num_qubits as usize,
+                        ))
+                    }
                 }
-                let data = generate_batch(&self.config, *batch_index, 
self.vector_len);
-                *batch_index += 1;
-                (data, self.config.num_qubits as usize)
             }
+            DataSource::Streaming {
+                reader,
+                buffer,
+                buffer_cursor,
+                read_chunk_scratch,
+                sample_size,
+                batch_limit,
+                batches_yielded,
+            } => {
+                if *batches_yielded >= *batch_limit {
+                    None
+                } else {
+                    let required = self.config.batch_size * *sample_size;
+                    while (buffer.len() - *buffer_cursor) < required {
+                        let r = reader.get_mut().map_err(|e| {
+                            MahoutError::Io(format!("Streaming reader mutex 
poisoned: {}", e))
+                        })?;
+                        let written = r.read_chunk(read_chunk_scratch)?;
+                        if written == 0 {
+                            break;
+                        }
+                        
buffer.extend_from_slice(&read_chunk_scratch[..written]);
+                    }
+                    let available = buffer.len() - *buffer_cursor;
+                    let available_samples = available / *sample_size;
+                    if available_samples == 0 {
+                        None
+                    } else {
+                        let batch_n = 
available_samples.min(self.config.batch_size);
+                        let start = *buffer_cursor;
+                        let end = start + batch_n * *sample_size;
+                        *buffer_cursor = end;
+                        *batches_yielded += 1;
+                        let slice = buffer[start..end].to_vec();
+                        if *buffer_cursor >= buffer.len() / 
BUFFER_COMPACT_DENOM {
+                            buffer.drain(..*buffer_cursor);
+                            *buffer_cursor = 0;
+                        }
+                        Some((
+                            slice,
+                            batch_n,
+                            *sample_size,
+                            self.config.num_qubits as usize,
+                        ))
+                    }
+                }
+            }
+        })
+    }
+
+    /// Returns the next batch as a DLPack pointer; `Ok(None)` when exhausted.
+    pub fn next_batch(&mut self) -> Result<Option<*mut DLManagedTensor>> {
+        let Some((batch_data, batch_n, sample_size, num_qubits)) = 
self.take_batch_from_source()?
+        else {
+            return Ok(None);
         };
         let ptr = self.engine.encode_batch(
             &batch_data,
-            self.config.batch_size,
-            self.vector_len,
+            batch_n,
+            sample_size,
             num_qubits,
             &self.config.encoding_method,
         )?;
diff --git a/qdp/qdp-python/qumat_qdp/loader.py 
b/qdp/qdp-python/qumat_qdp/loader.py
index d5c37d65a..6c908325d 100644
--- a/qdp/qdp-python/qumat_qdp/loader.py
+++ b/qdp/qdp-python/qumat_qdp/loader.py
@@ -35,13 +35,9 @@ from typing import TYPE_CHECKING, Iterator, Optional
 if TYPE_CHECKING:
     import _qdp  # noqa: F401 -- for type checkers only
 
-# Rust interface expects seed as Option<u64>: non-negative and <= 2^64 - 1.
-# Ref: qdp-core PipelineConfig seed: Option<u64>
+# Seed must fit Rust u64: 0 <= seed <= 2^64 - 1.
 _U64_MAX = 2**64 - 1
 
-# Lazy import _qdp at runtime until __iter__ is used; TYPE_CHECKING import 
above
-# is for type checkers only so they can resolve "_qdp.*" annotations if needed.
-
 
 @lru_cache(maxsize=1)
 def _get_qdp():
@@ -116,6 +112,12 @@ class QuantumDataLoader:
         self._total_batches = total_batches
         self._encoding_method = encoding_method
         self._seed = seed
+        self._file_path: Optional[str] = None
+        self._streaming_requested = (
+            False  # set True by source_file(streaming=True); Phase 2b
+        )
+        self._synthetic_requested = False  # set True only by 
source_synthetic()
+        self._file_requested = False
 
     def qubits(self, n: int) -> QuantumDataLoader:
         """Set number of qubits. Returns self for chaining."""
@@ -148,6 +150,7 @@ class QuantumDataLoader:
         total_batches: Optional[int] = None,
     ) -> QuantumDataLoader:
         """Use synthetic data source (default). Optionally override 
total_batches. Returns self."""
+        self._synthetic_requested = True
         if total_batches is not None:
             if not isinstance(total_batches, int) or total_batches < 1:
                 raise ValueError(
@@ -156,6 +159,23 @@ class QuantumDataLoader:
             self._total_batches = total_batches
         return self
 
+    def source_file(self, path: str, streaming: bool = False) -> 
QuantumDataLoader:
+        """Use file data source. Path must point to a supported format. 
Returns self.
+
+        For streaming=True (Phase 2b), only .parquet is supported; data is 
read in chunks to reduce memory.
+        For streaming=False, supports .parquet, .arrow, .feather, .ipc, .npy, 
.pt, .pth, .pb.
+        """
+        if not path or not isinstance(path, str):
+            raise ValueError(f"path must be a non-empty string, got {path!r}")
+        if streaming and not (path.lower().endswith(".parquet")):
+            raise ValueError(
+                "streaming=True supports only .parquet files; use 
streaming=False for other formats."
+            )
+        self._file_path = path
+        self._file_requested = True
+        self._streaming_requested = streaming
+        return self
+
     def seed(self, s: Optional[int] = None) -> QuantumDataLoader:
         """Set RNG seed for reproducible synthetic data (must fit Rust u64: 0 
<= seed <= 2^64-1). Returns self."""
         if s is not None:
@@ -170,16 +190,26 @@ class QuantumDataLoader:
         self._seed = s
         return self
 
-    def __iter__(self) -> Iterator[object]:
-        """Return Rust-backed iterator that yields one QuantumTensor per 
batch."""
-        _validate_loader_args(
-            device_id=self._device_id,
-            num_qubits=self._num_qubits,
-            batch_size=self._batch_size,
-            total_batches=self._total_batches,
-            encoding_method=self._encoding_method,
-            seed=self._seed,
-        )
+    def _create_iterator(self) -> Iterator[object]:
+        """Build engine and return the Rust-backed loader iterator (synthetic 
or file)."""
+        if self._synthetic_requested and self._file_requested:
+            raise ValueError(
+                "Cannot set both synthetic and file sources; use either 
.source_synthetic() or .source_file(path), not both."
+            )
+        if self._file_requested and self._file_path is None:
+            raise ValueError(
+                "source_file() was not called with a path; set file source 
with .source_file(path)."
+            )
+        use_synthetic = not self._file_requested
+        if use_synthetic:
+            _validate_loader_args(
+                device_id=self._device_id,
+                num_qubits=self._num_qubits,
+                batch_size=self._batch_size,
+                total_batches=self._total_batches,
+                encoding_method=self._encoding_method,
+                seed=self._seed,
+            )
         qdp = _get_qdp()
         QdpEngine = getattr(qdp, "QdpEngine", None)
         if QdpEngine is None:
@@ -187,16 +217,43 @@ class QuantumDataLoader:
                 "_qdp.QdpEngine not found. Build the extension with maturin 
develop."
             )
         engine = QdpEngine(device_id=self._device_id)
+        if not use_synthetic:
+            if self._streaming_requested:
+                create_loader = getattr(engine, 
"create_streaming_file_loader", None)
+                if create_loader is None:
+                    raise RuntimeError(
+                        "create_streaming_file_loader not available (e.g. only 
on Linux with CUDA)."
+                    )
+            else:
+                create_loader = getattr(engine, "create_file_loader", None)
+                if create_loader is None:
+                    raise RuntimeError(
+                        "create_file_loader not available (e.g. only on Linux 
with CUDA)."
+                    )
+            return iter(
+                create_loader(
+                    path=self._file_path,
+                    batch_size=self._batch_size,
+                    num_qubits=self._num_qubits,
+                    encoding_method=self._encoding_method,
+                    batch_limit=None,
+                )
+            )
         create_synthetic_loader = getattr(engine, "create_synthetic_loader", 
None)
         if create_synthetic_loader is None:
             raise RuntimeError(
                 "create_synthetic_loader not available (e.g. only on Linux 
with CUDA)."
             )
-        loader = create_synthetic_loader(
-            total_batches=self._total_batches,
-            batch_size=self._batch_size,
-            num_qubits=self._num_qubits,
-            encoding_method=self._encoding_method,
-            seed=self._seed,
+        return iter(
+            create_synthetic_loader(
+                total_batches=self._total_batches,
+                batch_size=self._batch_size,
+                num_qubits=self._num_qubits,
+                encoding_method=self._encoding_method,
+                seed=self._seed,
+            )
         )
-        return iter(loader)
+
+    def __iter__(self) -> Iterator[object]:
+        """Return Rust-backed iterator that yields one QuantumTensor per 
batch."""
+        return self._create_iterator()
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index ff1083f97..7afea5284 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -15,21 +15,13 @@
 // limitations under the License.
 
 use numpy::{PyReadonlyArray1, PyReadonlyArray2, PyUntypedArrayMethods};
-use pyo3::exceptions::{PyRuntimeError, PyStopIteration};
+use pyo3::exceptions::PyRuntimeError;
 use pyo3::ffi;
 use pyo3::prelude::*;
 use qdp_core::dlpack::{DL_FLOAT, DLDeviceType, DLManagedTensor};
 use qdp_core::{Precision, QdpEngine as CoreEngine};
 use std::ffi::c_void;
 
-#[cfg(target_os = "linux")]
-use qdp_core::{PipelineConfig, PipelineIterator, PipelineRunResult, 
run_throughput_pipeline};
-
-/// Wraps raw DLPack pointer so it can cross `py.allow_threads` (closure 
return must be `Send`).
-/// Safe: DLPack pointer handover across contexts; GIL is released only during 
the closure.
-struct SendPtr(pub *mut DLManagedTensor);
-unsafe impl Send for SendPtr {}
-
 /// Quantum tensor wrapper implementing DLPack protocol
 ///
 /// This class wraps a GPU-allocated quantum state vector and implements
@@ -157,74 +149,6 @@ impl Drop for QuantumTensor {
 unsafe impl Send for QuantumTensor {}
 unsafe impl Sync for QuantumTensor {}
 
-/// Python iterator yielding one QuantumTensor (batch) per __next__. Releases 
GIL during next_batch().
-#[cfg(target_os = "linux")]
-#[pyclass]
-struct PyQuantumLoader {
-    inner: Option<PipelineIterator>,
-}
-
-#[cfg(target_os = "linux")]
-#[pymethods]
-impl PyQuantumLoader {
-    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
-        slf
-    }
-
-    /// Returns the next batch as QuantumTensor; raises StopIteration when 
exhausted. Releases GIL during encode.
-    fn __next__(mut slf: PyRefMut<'_, Self>, py: Python<'_>) -> 
PyResult<QuantumTensor> {
-        let mut inner_iter = match slf.inner.take() {
-            Some(it) => it,
-            None => return Err(PyStopIteration::new_err("loader exhausted")),
-        };
-
-        #[allow(deprecated)]
-        let result = py.allow_threads(move || {
-            let res = inner_iter.next_batch();
-            match res {
-                Ok(Some(ptr)) => Ok((inner_iter, Some(SendPtr(ptr)))),
-                Ok(None) => Ok((inner_iter, None)),
-                Err(e) => Err((inner_iter, e)),
-            }
-        });
-
-        match result {
-            Ok((returned_iter, Some(send_ptr))) => {
-                slf.inner = Some(returned_iter);
-                Ok(QuantumTensor {
-                    ptr: send_ptr.0,
-                    consumed: false,
-                })
-            }
-            Ok((_, None)) => Err(PyStopIteration::new_err("loader exhausted")),
-            Err((returned_iter, e)) => {
-                slf.inner = Some(returned_iter);
-                Err(PyRuntimeError::new_err(e.to_string()))
-            }
-        }
-    }
-}
-
-/// Stub PyQuantumLoader when not on Linux (CUDA pipeline not available).
-#[cfg(not(target_os = "linux"))]
-#[pyclass]
-struct PyQuantumLoader {}
-
-#[cfg(not(target_os = "linux"))]
-#[pymethods]
-impl PyQuantumLoader {
-    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
-        slf
-    }
-
-    fn __next__(&self, _py: Python<'_>) -> PyResult<QuantumTensor> {
-        Err(PyRuntimeError::new_err(
-            "QuantumDataLoader is only available on Linux (CUDA pipeline). \
-             Build and run from a Linux host with CUDA.",
-        ))
-    }
-}
-
 /// Helper to detect PyTorch tensor
 fn is_pytorch_tensor(obj: &Bound<'_, PyAny>) -> PyResult<bool> {
     let type_obj = obj.get_type();
@@ -340,9 +264,12 @@ fn get_torch_cuda_stream_ptr(tensor: &Bound<'_, PyAny>) -> 
PyResult<*mut c_void>
         )));
     }
 
-    // PyTorch default stream can report cuda_stream as 0; treat as valid 
(Rust sync is no-op for null).
     let stream_ptr: u64 = stream.getattr("cuda_stream")?.extract()?;
-    Ok(stream_ptr as *mut c_void)
+    Ok(if stream_ptr == 0 {
+        std::ptr::null_mut()
+    } else {
+        stream_ptr as *mut c_void
+    })
 }
 
 /// Validate a CUDA tensor for direct GPU encoding
@@ -1123,22 +1050,10 @@ impl QdpEngine {
         })
     }
 
-    /// Create a synthetic-data loader iterator for use in Python `for qt in 
loader`.
-    ///
-    /// Yields one QuantumTensor (batch) per iteration; releases GIL during 
encode.
-    /// Use with QuantumDataLoader builder or directly for streaming encode.
-    ///
-    /// Args:
-    ///     total_batches: Number of batches to yield
-    ///     batch_size: Samples per batch
-    ///     num_qubits: Qubits per sample
-    ///     encoding_method: "amplitude", "angle", or "basis"
-    ///     seed: Optional RNG seed for reproducible synthetic data
-    ///
-    /// Returns:
-    ///     PyQuantumLoader: iterator yielding QuantumTensor per __next__
+    // --- Loader factory methods (Linux only) ---
     #[cfg(target_os = "linux")]
-    #[pyo3(signature = (total_batches, batch_size=64, num_qubits=16, 
encoding_method="amplitude", seed=None))]
+    /// Create a synthetic-data pipeline iterator (for 
QuantumDataLoader.source_synthetic()).
+    #[pyo3(signature = (total_batches, batch_size, num_qubits, 
encoding_method, seed=None))]
     fn create_synthetic_loader(
         &self,
         total_batches: usize,
@@ -1147,168 +1062,193 @@ impl QdpEngine {
         encoding_method: &str,
         seed: Option<u64>,
     ) -> PyResult<PyQuantumLoader> {
-        let config = PipelineConfig {
-            device_id: self.engine.device().ordinal(),
-            num_qubits,
+        let config = config_from_args(
+            &self.engine,
             batch_size,
+            num_qubits,
+            encoding_method,
             total_batches,
-            encoding_method: encoding_method.to_string(),
             seed,
-            warmup_batches: 0,
-        };
-        let iter = PipelineIterator::new_synthetic(self.engine.clone(), config)
-            .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
-        Ok(PyQuantumLoader { inner: Some(iter) })
+        );
+        let iter = 
qdp_core::PipelineIterator::new_synthetic(self.engine.clone(), config).map_err(
+            |e| PyRuntimeError::new_err(format!("create_synthetic_loader 
failed: {}", e)),
+        )?;
+        Ok(PyQuantumLoader::new(Some(iter)))
     }
 
-    /// Stub when not on Linux: create_synthetic_loader is only implemented on 
Linux.
-    #[cfg(not(target_os = "linux"))]
-    #[pyo3(signature = (total_batches, batch_size=64, num_qubits=16, 
encoding_method="amplitude", seed=None))]
-    fn create_synthetic_loader(
+    #[cfg(target_os = "linux")]
+    /// Create a file-backed pipeline iterator (full read then batch; for 
QuantumDataLoader.source_file(path)).
+    #[pyo3(signature = (path, batch_size, num_qubits, encoding_method, 
batch_limit=None))]
+    fn create_file_loader(
         &self,
-        total_batches: usize,
+        py: Python<'_>,
+        path: &Bound<'_, PyAny>,
         batch_size: usize,
         num_qubits: u32,
         encoding_method: &str,
-        seed: Option<u64>,
+        batch_limit: Option<usize>,
     ) -> PyResult<PyQuantumLoader> {
-        let _ = (total_batches, batch_size, num_qubits, encoding_method, seed);
-        Err(PyRuntimeError::new_err(
-            "create_synthetic_loader is only available on Linux (CUDA 
pipeline). \
-             Build and run from a Linux host with CUDA.",
-        ))
+        let path_str = path_from_py(path)?;
+        let batch_limit = batch_limit.unwrap_or(usize::MAX);
+        let config = config_from_args(
+            &self.engine,
+            batch_size,
+            num_qubits,
+            encoding_method,
+            0,
+            None,
+        );
+        let engine = self.engine.clone();
+        let iter = py
+            .detach(|| {
+                qdp_core::PipelineIterator::new_from_file(
+                    engine,
+                    path_str.as_str(),
+                    config,
+                    batch_limit,
+                )
+            })
+            .map_err(|e| PyRuntimeError::new_err(format!("create_file_loader 
failed: {}", e)))?;
+        Ok(PyQuantumLoader::new(Some(iter)))
     }
 
-    /// Run dual-stream pipeline for encoding (H2D + kernel overlap). Internal 
API.
-    ///
-    /// Exposes run_dual_stream_pipeline from qdp-core. Accepts 1D host data 
(single sample).
-    /// Does not return a tensor; use for throughput measurement or when state 
is not needed.
-    /// Currently supports amplitude encoding only.
-    ///
-    /// Args:
-    ///     host_data: 1D input (list or NumPy array, float64)
-    ///     num_qubits: Number of qubits
-    ///     encoding_method: "amplitude" (other methods not yet supported for 
this path)
     #[cfg(target_os = "linux")]
-    fn _encode_stream_internal(
+    /// Create a streaming Parquet pipeline iterator (for 
QuantumDataLoader.source_file(path, streaming=True)).
+    #[pyo3(signature = (path, batch_size, num_qubits, encoding_method, 
batch_limit=None))]
+    fn create_streaming_file_loader(
         &self,
-        host_data: &Bound<'_, PyAny>,
-        num_qubits: usize,
+        py: Python<'_>,
+        path: &Bound<'_, PyAny>,
+        batch_size: usize,
+        num_qubits: u32,
         encoding_method: &str,
-    ) -> PyResult<()> {
-        let data_slice: Vec<f64> = if 
host_data.hasattr("__array_interface__")? {
-            let array_1d = 
host_data.extract::<PyReadonlyArray1<f64>>().map_err(|_| {
-                PyRuntimeError::new_err("host_data must be 1D NumPy array with 
dtype float64")
+        batch_limit: Option<usize>,
+    ) -> PyResult<PyQuantumLoader> {
+        let path_str = path_from_py(path)?;
+        let batch_limit = batch_limit.unwrap_or(usize::MAX);
+        let config = config_from_args(
+            &self.engine,
+            batch_size,
+            num_qubits,
+            encoding_method,
+            0,
+            None,
+        );
+        let engine = self.engine.clone();
+        let iter = py
+            .detach(|| {
+                qdp_core::PipelineIterator::new_from_file_streaming(
+                    engine,
+                    path_str.as_str(),
+                    config,
+                    batch_limit,
+                )
+            })
+            .map_err(|e| {
+                PyRuntimeError::new_err(format!("create_streaming_file_loader 
failed: {}", e))
             })?;
-            array_1d
-                .as_slice()
-                .map_err(|_| PyRuntimeError::new_err("NumPy array must be 
contiguous (C-order)"))?
-                .to_vec()
-        } else {
-            host_data.extract::<Vec<f64>>().map_err(|_| {
-                PyRuntimeError::new_err("host_data must be 1D list/array of 
float64")
-            })?
-        };
-        self.engine
-            .run_dual_stream_encode(&data_slice, num_qubits, encoding_method)
-            .map_err(|e| 
PyRuntimeError::new_err(format!("run_dual_stream_encode failed: {}", e)))
+        Ok(PyQuantumLoader::new(Some(iter)))
     }
 }
 
-/// Runs the full throughput pipeline in Rust with GIL released. Returns 
(duration_sec, vectors_per_sec, latency_ms_per_vector).
+// --- Loader bindings (Linux only; qdp-core pipeline types only built on 
Linux) ---
 #[cfg(target_os = "linux")]
-#[pyfunction]
-#[pyo3(signature = (device_id=0, num_qubits=16, batch_size=64, 
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None))]
-#[allow(clippy::too_many_arguments)]
-fn run_throughput_pipeline_py_impl(
-    py: Python<'_>,
-    device_id: usize,
-    num_qubits: u32,
-    batch_size: usize,
-    total_batches: usize,
-    encoding_method: &str,
-    warmup_batches: usize,
-    seed: Option<u64>,
-) -> PyResult<(f64, f64, f64)> {
-    let encoding_method = encoding_method.to_string();
-    #[allow(deprecated)]
-    let result: Result<PipelineRunResult, qdp_core::MahoutError> = 
py.allow_threads(move || {
-        let config = PipelineConfig {
-            device_id,
+mod loader_bindings {
+    use super::*;
+    use pyo3::exceptions::PyStopIteration;
+    use qdp_core::{PipelineConfig, PipelineIterator};
+
+    /// Rust-backed iterator yielding one QuantumTensor per batch; used by 
QuantumDataLoader.
+    #[pyclass]
+    pub struct PyQuantumLoader {
+        inner: Option<PipelineIterator>,
+    }
+
+    impl PyQuantumLoader {
+        pub fn new(inner: Option<PipelineIterator>) -> Self {
+            Self { inner }
+        }
+    }
+
+    #[pymethods]
+    impl PyQuantumLoader {
+        fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+            slf
+        }
+
+        fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<QuantumTensor> {
+            let mut iter: PipelineIterator = match slf.inner.take() {
+                Some(i) => i,
+                None => return Err(PyStopIteration::new_err("")),
+            };
+            // Call next_batch without releasing GIL (return type *mut 
DLManagedTensor is !Send).
+            let result = iter.next_batch();
+            match result {
+                Ok(Some(ptr)) => {
+                    slf.inner = Some(iter);
+                    Ok(QuantumTensor {
+                        ptr,
+                        consumed: false,
+                    })
+                }
+                Ok(None) => {
+                    // Exhausted; do not put iterator back
+                    Err(PyStopIteration::new_err(""))
+                }
+                Err(e) => {
+                    slf.inner = Some(iter);
+                    Err(PyRuntimeError::new_err(format!(
+                        "Pipeline next_batch failed: {}",
+                        e
+                    )))
+                }
+            }
+        }
+    }
+
+    /// Build PipelineConfig from Python args. device_id is 0 (engine does not 
expose it); iterator uses engine clone with correct device.
+    pub fn config_from_args(
+        _engine: &CoreEngine,
+        batch_size: usize,
+        num_qubits: u32,
+        encoding_method: &str,
+        total_batches: usize,
+        seed: Option<u64>,
+    ) -> PipelineConfig {
+        PipelineConfig {
+            device_id: 0,
             num_qubits,
             batch_size,
             total_batches,
-            encoding_method,
+            encoding_method: encoding_method.to_string(),
             seed,
-            warmup_batches,
-        };
-        run_throughput_pipeline(&config)
-    });
-    let res = result.map_err(|e: qdp_core::MahoutError| 
PyRuntimeError::new_err(e.to_string()))?;
-    Ok((
-        res.duration_sec,
-        res.vectors_per_sec,
-        res.latency_ms_per_vector,
-    ))
-}
+            warmup_batches: 0,
+        }
+    }
 
-/// Stub when not on Linux: run_throughput_pipeline_py is only implemented on 
Linux (CUDA pipeline).
-#[cfg(not(target_os = "linux"))]
-#[pyfunction]
-#[pyo3(signature = (device_id=0, num_qubits=16, batch_size=64, 
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None))]
-fn run_throughput_pipeline_py_impl(
-    _py: Python<'_>,
-    _device_id: usize,
-    _num_qubits: u32,
-    _batch_size: usize,
-    _total_batches: usize,
-    _encoding_method: &str,
-    _warmup_batches: usize,
-    _seed: Option<u64>,
-) -> PyResult<(f64, f64, f64)> {
-    Err(PyRuntimeError::new_err(
-        "run_throughput_pipeline_py is only available on Linux (CUDA 
pipeline). \
-         Build and run from a Linux host with CUDA.",
-    ))
+    /// Resolve path from Python str or pathlib.Path (__fspath__).
+    pub fn path_from_py(path: &Bound<'_, PyAny>) -> PyResult<String> {
+        path.extract::<String>().or_else(|_| {
+            path.call_method0("__fspath__")
+                .and_then(|m| m.extract::<String>())
+        })
+    }
 }
 
-/// Public wrapper so the same name is always present in the module (import 
never fails).
-#[pyfunction]
-#[pyo3(signature = (device_id=0, num_qubits=16, batch_size=64, 
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None))]
-#[allow(clippy::too_many_arguments)]
-fn run_throughput_pipeline_py(
-    py: Python<'_>,
-    device_id: usize,
-    num_qubits: u32,
-    batch_size: usize,
-    total_batches: usize,
-    encoding_method: &str,
-    warmup_batches: usize,
-    seed: Option<u64>,
-) -> PyResult<(f64, f64, f64)> {
-    run_throughput_pipeline_py_impl(
-        py,
-        device_id,
-        num_qubits,
-        batch_size,
-        total_batches,
-        encoding_method,
-        warmup_batches,
-        seed,
-    )
-}
+#[cfg(target_os = "linux")]
+use loader_bindings::{PyQuantumLoader, config_from_args, path_from_py};
 
 /// Quantum Data Plane (QDP) Python module
 ///
 /// GPU-accelerated quantum data encoding with DLPack integration.
 #[pymodule]
 fn _qdp(m: &Bound<'_, PyModule>) -> PyResult<()> {
-    // Respect RUST_LOG; try_init() is idempotent if already initialized
+    // Respect RUST_LOG for Rust log output; try_init() is no-op if already 
initialized.
     let _ = env_logger::Builder::from_default_env().try_init();
 
     m.add_class::<QdpEngine>()?;
     m.add_class::<QuantumTensor>()?;
+    #[cfg(target_os = "linux")]
     m.add_class::<PyQuantumLoader>()?;
-    m.add_function(pyo3::wrap_pyfunction!(run_throughput_pipeline_py, m)?)?;
     Ok(())
 }
diff --git a/qdp/qdp-python/tests/test_dlpack_validation.py 
b/qdp/qdp-python/tests/test_dlpack_validation.py
index 84335c94a..ce45bdf92 100644
--- a/qdp/qdp-python/tests/test_dlpack_validation.py
+++ b/qdp/qdp-python/tests/test_dlpack_validation.py
@@ -47,27 +47,34 @@ def test_dtype_validation_float32_rejected():
         engine.encode(t, num_qubits=2, encoding_method="amplitude")
     msg = str(exc_info.value).lower()
     assert "float64" in msg
-    assert "code=" in msg or "bits=" in msg or "lanes=" in msg
+    # Accept either DLPack-style (code=/bits=/lanes=) or user-facing 
(float32/dtype) message
+    assert (
+        "code=" in msg
+        or "bits=" in msg
+        or "lanes=" in msg
+        or "float32" in msg
+        or "dtype" in msg
+    )
 
 
 @pytest.mark.skipif(not _cuda_available(), reason="CUDA not available")
 def test_stride_1d_non_contiguous_rejected():
-    """Non-contiguous 1D CUDA tensor (stride != 1) should fail with actual vs 
expected."""
+    """Non-contiguous 1D CUDA tensor (stride != 1) should fail with contiguous 
requirement."""
     engine = _engine()
     # Slice so stride is 2: shape (2,), stride (2,)
     t = torch.randn(4, dtype=torch.float64, device="cuda")[::2]
     assert t.stride(0) != 1
     with pytest.raises(RuntimeError) as exc_info:
         engine.encode(t, num_qubits=1, encoding_method="amplitude")
-    msg = str(exc_info.value)
-    assert "contiguous" in msg.lower()
-    assert "stride[0]=" in msg
-    assert "expected 1" in msg or "expected 1 " in msg
+    msg = str(exc_info.value).lower()
+    assert "contiguous" in msg
+    # Accept either explicit stride[0]/expected or user-facing contiguous() 
hint
+    assert "stride" in msg or "contiguous()" in msg or "expected" in msg
 
 
 @pytest.mark.skipif(not _cuda_available(), reason="CUDA not available")
 def test_stride_2d_non_contiguous_rejected():
-    """Non-contiguous 2D CUDA tensor should fail with actual vs expected 
strides."""
+    """Non-contiguous 2D CUDA tensor should fail with contiguous 
requirement."""
     engine = _engine()
     # (4, 2) with strides (3, 2) -> not C-contiguous; expected for (4,2) is 
(2, 1)
     t = torch.randn(4, 3, dtype=torch.float64, device="cuda")[:, ::2]
@@ -76,10 +83,10 @@ def test_stride_2d_non_contiguous_rejected():
     assert t.stride(0) == 3 and t.stride(1) == 2
     with pytest.raises(RuntimeError) as exc_info:
         engine.encode(t, num_qubits=1, encoding_method="amplitude")
-    msg = str(exc_info.value)
-    assert "contiguous" in msg.lower()
-    assert "strides=" in msg
-    assert "expected" in msg
+    msg = str(exc_info.value).lower()
+    assert "contiguous" in msg
+    # Accept either explicit strides=/expected or user-facing contiguous() hint
+    assert "stride" in msg or "contiguous()" in msg or "expected" in msg
 
 
 @pytest.mark.skipif(not _cuda_available(), reason="CUDA not available")
diff --git a/qdp/qdp-python/tests/test_quantum_data_loader.py 
b/qdp/qdp-python/tests/test_quantum_data_loader.py
new file mode 100644
index 000000000..5d5fb2005
--- /dev/null
+++ b/qdp/qdp-python/tests/test_quantum_data_loader.py
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""tests for Quantum Data Loader."""
+
+import pytest
+
+try:
+    from qumat_qdp.loader import QuantumDataLoader
+except ImportError:
+    QuantumDataLoader = None  # type: ignore[assignment,misc]
+
+
+def _loader_available():
+    return QuantumDataLoader is not None
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not 
available")
+def test_mutual_exclusion_both_sources_raises():
+    """Calling both .source_synthetic() and .source_file() then __iter__ 
raises ValueError."""
+    loader = (
+        QuantumDataLoader(device_id=0)
+        .qubits(4)
+        .batches(10, size=4)
+        .source_synthetic()
+        .source_file("/tmp/any.parquet")
+    )
+    with pytest.raises(ValueError) as exc_info:
+        list(loader)
+    msg = str(exc_info.value)
+    assert "Cannot set both synthetic and file sources" in msg
+    assert "source_synthetic" in msg and "source_file" in msg
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not 
available")
+def test_mutual_exclusion_exact_message():
+    """ValueError when both sources set: message mentions source_synthetic and 
source_file."""
+    loader = (
+        QuantumDataLoader(device_id=0)
+        .qubits(4)
+        .batches(10, size=4)
+        .source_file("/tmp/x.npy")
+        .source_synthetic()
+    )
+    with pytest.raises(ValueError) as exc_info:
+        list(loader)
+    assert "Cannot set both synthetic and file sources" in str(exc_info.value)
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not 
available")
+def test_source_file_empty_path_raises():
+    """source_file() with empty path raises ValueError."""
+    loader = QuantumDataLoader(device_id=0).qubits(4).batches(10, size=4)
+    with pytest.raises(ValueError) as exc_info:
+        loader.source_file("")
+    assert "path" in str(exc_info.value).lower()
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not 
available")
+def test_synthetic_loader_batch_count():
+    """Synthetic loader yields exactly total_batches batches."""
+    total = 5
+    batch_size = 4
+    loader = (
+        QuantumDataLoader(device_id=0)
+        .qubits(4)
+        .batches(total, size=batch_size)
+        .source_synthetic()
+    )
+    try:
+        batches = list(loader)
+    except RuntimeError as e:
+        if "only available on Linux" in str(e) or "not available" in str(e):
+            pytest.skip("CUDA/Linux required for loader iteration")
+        raise
+    assert len(batches) == total
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not 
available")
+def test_file_loader_unsupported_extension_raises():
+    """source_file with unsupported extension raises at __iter__."""
+    loader = (
+        QuantumDataLoader(device_id=0)
+        .qubits(4)
+        .batches(10, size=4)
+        .source_file("/tmp/data.unsupported")
+    )
+    try:
+        list(loader)
+    except RuntimeError as e:
+        msg = str(e).lower()
+        if "not available" in msg:
+            pytest.skip(
+                "create_file_loader not available (e.g. extension built 
without loader)"
+            )
+            return
+        assert "unsupported" in msg or "extension" in msg or "supported" in msg
+        return
+    except ValueError:
+        pytest.skip("Loader may validate path before Rust")
+        return
+    pytest.fail("Expected RuntimeError for unsupported file extension")
+
+
+# --- Streaming (source_file(..., streaming=True)) tests ---
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not 
available")
+def test_streaming_requires_parquet():
+    """source_file(path, streaming=True) with non-.parquet path raises 
ValueError."""
+    with pytest.raises(ValueError) as exc_info:
+        QuantumDataLoader(device_id=0).qubits(4).batches(10, 
size=4).source_file(
+            "/tmp/data.npy", streaming=True
+        )
+    msg = str(exc_info.value).lower()
+    assert "parquet" in msg or "streaming" in msg
+
+
[email protected](not _loader_available(), reason="QuantumDataLoader not 
available")
+def test_streaming_parquet_extension_ok():
+    """source_file(path, streaming=True) with .parquet path does not raise at 
builder."""
+    loader = (
+        QuantumDataLoader(device_id=0)
+        .qubits(4)
+        .batches(10, size=4)
+        .source_file("/tmp/data.parquet", streaming=True)
+    )
+    # Iteration may raise RuntimeError (no CUDA) or fail on missing file; we 
only check builder accepts.
+    assert loader._streaming_requested is True
+    assert loader._file_path == "/tmp/data.parquet"


Reply via email to