This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/main by this push:
new 769f08be7 [QDP] Add a Quantum Data Loader and API refactor (#1000)
769f08be7 is described below
commit 769f08be788612bbe3f22064417b3cb8e97023c3
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Mon Feb 2 12:24:24 2026 +0800
[QDP] Add a Quantum Data Loader and API refactor (#1000)
* Add a Quantum Data Loader and API refactor
* fix error
* fix routes problem and update
* update
* improve according to comments
---
docs/qdp/python-api.md | 200 +++++++++++++++
qdp/qdp-core/src/gpu/encodings/amplitude.rs | 25 +-
qdp/qdp-core/src/lib.rs | 51 ++++
qdp/qdp-core/src/pipeline_runner.rs | 248 +++++++++++++++++++
qdp/qdp-python/benchmark/__init__.py | 17 ++
qdp/qdp-python/benchmark/api.py | 23 ++
qdp/qdp-python/benchmark/benchmark_latency.py | 41 ++--
.../benchmark/benchmark_loader_throughput.py | 106 ++++++++
qdp/qdp-python/benchmark/benchmark_throughput.py | 46 ++--
qdp/qdp-python/benchmark/loader.py | 23 ++
qdp/qdp-python/benchmark/run_pipeline_baseline.py | 60 +++--
qdp/qdp-python/pyproject.toml | 11 +
qdp/qdp-python/qumat_qdp/__init__.py | 56 +++++
qdp/qdp-python/qumat_qdp/api.py | 156 ++++++++++++
qdp/qdp-python/qumat_qdp/loader.py | 202 ++++++++++++++++
qdp/qdp-python/src/lib.rs | 269 ++++++++++++++++++++-
testing/qdp/test_benchmark_api.py | 115 +++++++++
testing/qdp/test_bindings.py | 9 +-
18 files changed, 1575 insertions(+), 83 deletions(-)
diff --git a/docs/qdp/python-api.md b/docs/qdp/python-api.md
new file mode 100644
index 000000000..707969e81
--- /dev/null
+++ b/docs/qdp/python-api.md
@@ -0,0 +1,200 @@
+# QDP Python API (qumat_qdp)
+
+Public Python API for QDP: GPU-accelerated encoding, benchmark helpers, and a
batch iterator for training or evaluation loops.
+
+## Overview
+
+The **qumat_qdp** package wraps the native extension `_qdp` and adds:
+
+- **Encoding:** `QdpEngine` and `QuantumTensor` for encoding classical data
into quantum states and zero-copy DLPack integration.
+- **Benchmark:** `QdpBenchmark` for throughput/latency runs (full pipeline in
Rust, GIL released).
+- **Data loader:** `QuantumDataLoader` for iterating encoded batches one at a
time (`for qt in loader:`).
+
+Import from the package:
+
+```python
+from qumat_qdp import (
+ QdpEngine,
+ QuantumTensor,
+ QdpBenchmark,
+ ThroughputResult,
+ LatencyResult,
+ QuantumDataLoader,
+ run_throughput_pipeline_py,
+)
+```
+
+**Requirements:** Linux with NVIDIA GPU (CUDA). Loader and pipeline helpers
are stubs on other platforms and raise `RuntimeError`.
+
+---
+
+## Encoding API
+
+### QdpEngine
+
+GPU encoder. Constructor and main methods:
+
+**`QdpEngine(device_id=0, precision="float32")`**
+
+- `device_id` (int): CUDA device ID.
+- `precision` (str): `"float32"` or `"float64"`.
+- Raises `RuntimeError` on init failure or unsupported precision.
+
+**`encode(data, num_qubits, encoding_method="amplitude") -> QuantumTensor`**
+
+- `data`: list of floats, 1D/2D NumPy array (float64, C-contiguous), PyTorch
tensor (CPU/CUDA), or file path (`.parquet`, `.arrow`, `.feather`, `.npy`,
`.pt`, `.pth`, `.pb`).
+- `num_qubits` (int): Number of qubits.
+- `encoding_method` (str): `"amplitude"` | `"angle"` | `"basis"` | `"iqp"` |
`"iqp-z"`.
+- Returns a DLPack-compatible tensor; use `torch.from_dlpack(qtensor)`. Shape
`[batch_size, 2^num_qubits]`.
+
+**`create_synthetic_loader(total_batches, batch_size=64, num_qubits=16,
encoding_method="amplitude", seed=None)`**
+
+- Returns an iterator that yields one `QuantumTensor` per batch. GIL is
released during each encode. Linux/CUDA only.
+
+### QuantumTensor
+
+DLPack wrapper for a GPU quantum state.
+
+- **`__dlpack__(stream=None)`:** Returns a DLPack PyCapsule (single use).
+- **`__dlpack_device__()`:** Returns `(device_type, device_id)`; CUDA is `(2,
gpu_id)`.
+
+If not consumed, memory is freed when the object is dropped; if consumed (e.g.
by PyTorch), ownership transfers to the consumer.
+
+---
+
+## Benchmark API
+
+Runs the full encode pipeline in Rust (warmup + timed loop) with GIL released.
No Python-side loop.
+
+### QdpBenchmark
+
+Builder; chain methods then call `run_throughput()` or `run_latency()`.
+
+**Constructor:** `QdpBenchmark(device_id=0)`
+
+**Chainable methods:**
+
+| Method | Description |
+|--------|-------------|
+| `qubits(n)` | Number of qubits. |
+| `encoding(method)` | `"amplitude"` \| `"angle"` \| `"basis"`. |
+| `batches(total, size=64)` | Total batches and batch size. |
+| `prefetch(n)` | No-op (API compatibility). |
+| `warmup(n)` | Warmup batch count. |
+
+**`run_throughput() -> ThroughputResult`**
+
+- Requires `qubits` and `batches` to be set.
+- Returns `ThroughputResult` with `duration_sec`, `vectors_per_sec`.
+- Raises `ValueError` if config missing; `RuntimeError` if pipeline
unavailable.
+
+**`run_latency() -> LatencyResult`**
+
+- Same pipeline; returns `LatencyResult` with `duration_sec`,
`latency_ms_per_vector`.
+
+### Result types
+
+| Type | Fields |
+|------|--------|
+| `ThroughputResult` | `duration_sec`, `vectors_per_sec` |
+| `LatencyResult` | `duration_sec`, `latency_ms_per_vector` |
+
+### Example
+
+```python
+from qumat_qdp import QdpBenchmark, ThroughputResult, LatencyResult
+
+result = (
+ QdpBenchmark(device_id=0)
+ .qubits(16)
+ .encoding("amplitude")
+ .batches(100, size=64)
+ .warmup(2)
+ .run_throughput()
+)
+print(result.vectors_per_sec)
+
+lat = (
+ QdpBenchmark(device_id=0)
+ .qubits(16)
+ .encoding("amplitude")
+ .batches(100, size=64)
+ .run_latency()
+)
+print(lat.latency_ms_per_vector)
+```
+
+---
+
+## Data Loader API
+
+Iterate over encoded batches one at a time. Each batch is a `QuantumTensor`;
encoding runs in Rust with GIL released per batch.
+
+### QuantumDataLoader
+
+Builder for a synthetic-data loader. Calling `iter(loader)` (or `for qt in
loader`) creates the Rust-backed iterator.
+
+**Constructor:**
+`QuantumDataLoader(device_id=0, num_qubits=16, batch_size=64,
total_batches=100, encoding_method="amplitude", seed=None)`
+
+**Chainable methods:**
+
+| Method | Description |
+|--------|-------------|
+| `qubits(n)` | Number of qubits. |
+| `encoding(method)` | `"amplitude"` \| `"angle"` \| `"basis"`. |
+| `batches(total, size=64)` | Total batches and batch size. |
+| `source_synthetic(total_batches=None)` | Synthetic data (default); optional
override for total batches. |
+| `seed(s)` | RNG seed for reproducibility. |
+
+**Iteration:** `for qt in loader:` yields `QuantumTensor` of shape
`[batch_size, 2^num_qubits]`. Consume once per tensor, e.g.
`torch.from_dlpack(qt)`.
+
+### Example
+
+```python
+from qumat_qdp import QuantumDataLoader
+import torch
+
+loader = (
+ QuantumDataLoader(device_id=0)
+ .qubits(16)
+ .encoding("amplitude")
+ .batches(100, size=64)
+ .source_synthetic()
+)
+
+for qt in loader:
+ batch = torch.from_dlpack(qt) # [batch_size, 2^16]
+ # use batch ...
+```
+
+---
+
+## Low-level: run_throughput_pipeline_py
+
+Runs the full pipeline in Rust with GIL released. Used by `QdpBenchmark`; can
be called directly.
+
+**Signature:**
+`run_throughput_pipeline_py(device_id=0, num_qubits=16, batch_size=64,
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None) ->
tuple[float, float, float]`
+
+**Returns:** `(duration_sec, vectors_per_sec, latency_ms_per_vector)`.
+
+**Raises:** `RuntimeError` on failure or when not available (e.g. non-Linux).
+
+---
+
+## Backward compatibility
+
+`benchmark/api.py` and `benchmark/loader.py` re-export from `qumat_qdp`.
Prefer:
+
+- `from qumat_qdp import QdpBenchmark, ThroughputResult, LatencyResult`
+- `from qumat_qdp import QuantumDataLoader`
+
+Benchmark scripts add the project root to `sys.path`, so from the `qdp-python`
directory you can run:
+
+```bash
+uv run python benchmark/run_pipeline_baseline.py
+uv run python benchmark/benchmark_loader_throughput.py
+```
+
+without setting `PYTHONPATH`.
diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
index 032418c46..037b3bd31 100644
--- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
+++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
@@ -316,7 +316,7 @@ impl AmplitudeEncoder {
/// streaming mechanics, while this method focuses on the amplitude
/// encoding kernel logic.
#[cfg(target_os = "linux")]
- fn encode_async_pipeline(
+ pub(crate) fn encode_async_pipeline(
device: &Arc<CudaDevice>,
host_data: &[f64],
_num_qubits: usize,
@@ -548,4 +548,27 @@ impl AmplitudeEncoder {
Ok(inv_norm)
}
+
+ /// Run dual-stream pipeline for amplitude encoding (exposed for Python /
benchmark).
+ #[cfg(target_os = "linux")]
+ pub(crate) fn run_amplitude_dual_stream_pipeline(
+ device: &Arc<CudaDevice>,
+ host_data: &[f64],
+ num_qubits: usize,
+ ) -> Result<()> {
+ Preprocessor::validate_input(host_data, num_qubits)?;
+ let state_len = 1 << num_qubits;
+ let state_vector = GpuStateVector::new(device, num_qubits,
Precision::Float64)?;
+ let norm = Preprocessor::calculate_l2_norm(host_data)?;
+ let inv_norm = 1.0 / norm;
+ Self::encode_async_pipeline(
+ device,
+ host_data,
+ num_qubits,
+ state_len,
+ inv_norm,
+ &state_vector,
+ )?;
+ Ok(())
+ }
}
diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs
index 6e50414a9..1fe172d1b 100644
--- a/qdp/qdp-core/src/lib.rs
+++ b/qdp/qdp-core/src/lib.rs
@@ -35,6 +35,16 @@ mod profiling;
pub use error::{MahoutError, Result, cuda_error_to_string};
pub use gpu::memory::Precision;
+// Throughput/latency pipeline runner: single path using QdpEngine and
encode_batch in Rust.
+#[cfg(target_os = "linux")]
+mod pipeline_runner;
+
+#[cfg(target_os = "linux")]
+pub use pipeline_runner::{
+ DataSource, PipelineConfig, PipelineIterator, PipelineRunResult,
run_latency_pipeline,
+ run_throughput_pipeline,
+};
+
use std::sync::Arc;
use crate::dlpack::DLManagedTensor;
@@ -45,6 +55,7 @@ use cudarc::driver::CudaDevice;
///
/// Manages GPU context and dispatches encoding tasks.
/// Provides unified interface for device management, memory allocation, and
DLPack.
+#[derive(Clone)]
pub struct QdpEngine {
device: Arc<CudaDevice>,
precision: Precision,
@@ -110,6 +121,15 @@ impl QdpEngine {
&self.device
}
+ /// Block until all GPU work on the default stream has completed.
+ /// Used by the generic pipeline and other callers that need to sync
before timing.
+ #[cfg(target_os = "linux")]
+ pub fn synchronize(&self) -> Result<()> {
+ self.device
+ .synchronize()
+ .map_err(|e| MahoutError::Cuda(format!("CUDA device synchronize
failed: {:?}", e)))
+ }
+
/// Encode multiple samples in a single fused kernel (most efficient)
///
/// Allocates one large GPU buffer and launches a single batch kernel.
@@ -148,6 +168,37 @@ impl QdpEngine {
Ok(dlpack_ptr)
}
+ /// Run dual-stream pipeline for encoding (H2D + kernel overlap). Exposes
gpu::pipeline::run_dual_stream_pipeline.
+ /// Currently supports amplitude encoding (1D host_data). Does not return
a tensor;
+ /// use for throughput measurement or when the encoded state is not needed.
+ ///
+ /// # Arguments
+ /// * `host_data` - 1D input data (e.g. single sample for amplitude)
+ /// * `num_qubits` - Number of qubits
+ /// * `encoding_method` - Strategy (currently only "amplitude" supported
for this path)
+ #[cfg(target_os = "linux")]
+ pub fn run_dual_stream_encode(
+ &self,
+ host_data: &[f64],
+ num_qubits: usize,
+ encoding_method: &str,
+ ) -> Result<()> {
+ crate::profile_scope!("Mahout::RunDualStreamEncode");
+ match encoding_method.to_lowercase().as_str() {
+ "amplitude" => {
+
gpu::encodings::amplitude::AmplitudeEncoder::run_amplitude_dual_stream_pipeline(
+ &self.device,
+ host_data,
+ num_qubits,
+ )
+ }
+ _ => Err(MahoutError::InvalidInput(format!(
+ "run_dual_stream_encode supports only 'amplitude' for now, got
'{}'",
+ encoding_method
+ ))),
+ }
+ }
+
/// Streaming Parquet encoder with multi-threaded IO
///
/// Uses Producer-Consumer pattern: IO thread reads Parquet while GPU
processes data.
diff --git a/qdp/qdp-core/src/pipeline_runner.rs
b/qdp/qdp-core/src/pipeline_runner.rs
new file mode 100644
index 000000000..9952e26ad
--- /dev/null
+++ b/qdp/qdp-core/src/pipeline_runner.rs
@@ -0,0 +1,248 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Throughput/latency pipeline using QdpEngine and encode_batch. Full loop
runs in Rust;
+// Python bindings release GIL during the run.
+
+use std::f64::consts::PI;
+use std::time::Instant;
+
+use crate::QdpEngine;
+use crate::dlpack::DLManagedTensor;
+use crate::error::Result;
+
+/// Configuration for throughput/latency pipeline runs (Python
run_throughput_pipeline_py).
+#[derive(Clone, Debug)]
+pub struct PipelineConfig {
+ pub device_id: usize,
+ pub num_qubits: u32,
+ pub batch_size: usize,
+ pub total_batches: usize,
+ pub encoding_method: String,
+ pub seed: Option<u64>,
+ pub warmup_batches: usize,
+}
+
+impl Default for PipelineConfig {
+ fn default() -> Self {
+ Self {
+ device_id: 0,
+ num_qubits: 16,
+ batch_size: 64,
+ total_batches: 100,
+ encoding_method: "amplitude".to_string(),
+ seed: None,
+ warmup_batches: 0,
+ }
+ }
+}
+
+/// Result of a throughput or latency pipeline run.
+#[derive(Clone, Debug)]
+pub struct PipelineRunResult {
+ pub duration_sec: f64,
+ pub vectors_per_sec: f64,
+ pub latency_ms_per_vector: f64,
+}
+
+/// Data source for the pipeline iterator (Phase 1: Synthetic only; Phase 2:
File).
+#[derive(Debug)]
+pub enum DataSource {
+ Synthetic {
+ seed: u64,
+ batch_index: usize,
+ total_batches: usize,
+ },
+}
+
+/// Stateful iterator that yields one batch DLPack at a time for Python `for`
loop consumption.
+/// Holds a clone of QdpEngine, PipelineConfig, and source state; reuses
generate_batch and encode_batch.
+pub struct PipelineIterator {
+ engine: QdpEngine,
+ config: PipelineConfig,
+ source: DataSource,
+ vector_len: usize,
+}
+
+impl PipelineIterator {
+ /// Create a new synthetic-data pipeline iterator.
+ pub fn new_synthetic(engine: QdpEngine, config: PipelineConfig) ->
Result<Self> {
+ let vector_len = vector_len(config.num_qubits,
&config.encoding_method);
+ let source = DataSource::Synthetic {
+ seed: config.seed.unwrap_or(0),
+ batch_index: 0,
+ total_batches: config.total_batches,
+ };
+ Ok(Self {
+ engine,
+ config,
+ source,
+ vector_len,
+ })
+ }
+
+ /// Returns the next batch as a DLPack pointer; `Ok(None)` when exhausted.
+ pub fn next_batch(&mut self) -> Result<Option<*mut DLManagedTensor>> {
+ let (batch_data, num_qubits) = match &mut self.source {
+ DataSource::Synthetic {
+ batch_index,
+ total_batches,
+ ..
+ } => {
+ if *batch_index >= *total_batches {
+ return Ok(None);
+ }
+ let data = generate_batch(&self.config, *batch_index,
self.vector_len);
+ *batch_index += 1;
+ (data, self.config.num_qubits as usize)
+ }
+ };
+ let ptr = self.engine.encode_batch(
+ &batch_data,
+ self.config.batch_size,
+ self.vector_len,
+ num_qubits,
+ &self.config.encoding_method,
+ )?;
+ Ok(Some(ptr))
+ }
+}
+
+/// Vector length per sample for given encoding (used by pipeline and
iterator).
+pub fn vector_len(num_qubits: u32, encoding_method: &str) -> usize {
+ let n = num_qubits as usize;
+ match encoding_method.to_lowercase().as_str() {
+ "angle" => n,
+ "basis" => 1,
+ _ => 1 << n, // amplitude
+ }
+}
+
+/// Deterministic sample generation matching Python utils.build_sample
(amplitude/angle/basis).
+fn fill_sample(seed: u64, out: &mut [f64], encoding_method: &str) ->
Result<()> {
+ let len = out.len();
+ if len == 0 {
+ return Ok(());
+ }
+ match encoding_method.to_lowercase().as_str() {
+ "basis" => {
+ let mask = len.saturating_sub(1) as u64;
+ let idx = seed & mask;
+ out[0] = idx as f64;
+ }
+ "angle" => {
+ let scale = (2.0 * PI) / len as f64;
+ for (i, v) in out.iter_mut().enumerate() {
+ let mixed = (i as u64 + seed) % (len as u64);
+ *v = mixed as f64 * scale;
+ }
+ }
+ _ => {
+ // amplitude
+ let mask = (len - 1) as u64;
+ let scale = 1.0 / len as f64;
+ for (i, v) in out.iter_mut().enumerate() {
+ let mixed = (i as u64 + seed) & mask;
+ *v = mixed as f64 * scale;
+ }
+ }
+ }
+ Ok(())
+}
+
+/// Generate one batch (batch_size * vector_len elements, or batch_size * 1
for basis).
+fn generate_batch(config: &PipelineConfig, batch_idx: usize, vector_len:
usize) -> Vec<f64> {
+ let seed_base = config
+ .seed
+ .unwrap_or(0)
+ .wrapping_add((batch_idx * config.batch_size) as u64);
+ let mut batch = vec![0.0f64; config.batch_size * vector_len];
+ for i in 0..config.batch_size {
+ let offset = i * vector_len;
+ let _ = fill_sample(
+ seed_base + i as u64,
+ &mut batch[offset..offset + vector_len],
+ &config.encoding_method,
+ );
+ }
+ batch
+}
+
+/// Release DLPack tensor (call deleter so GPU memory is freed).
+unsafe fn release_dlpack(ptr: *mut DLManagedTensor) {
+ if ptr.is_null() {
+ return;
+ }
+ let managed = unsafe { &mut *ptr };
+ if let Some(deleter) = managed.deleter.take() {
+ unsafe { deleter(ptr) };
+ }
+}
+
+/// Run throughput pipeline: warmup, then timed encode_batch loop; returns
stats.
+pub fn run_throughput_pipeline(config: &PipelineConfig) ->
Result<PipelineRunResult> {
+ let engine = QdpEngine::new(config.device_id)?;
+ let vector_len = vector_len(config.num_qubits, &config.encoding_method);
+ let num_qubits = config.num_qubits as usize;
+
+ // Warmup
+ for b in 0..config.warmup_batches {
+ let batch = generate_batch(config, b, vector_len);
+ let ptr = engine.encode_batch(
+ &batch,
+ config.batch_size,
+ vector_len,
+ num_qubits,
+ &config.encoding_method,
+ )?;
+ unsafe { release_dlpack(ptr) };
+ }
+
+ #[cfg(target_os = "linux")]
+ engine.synchronize()?;
+
+ let start = Instant::now();
+ for b in 0..config.total_batches {
+ let batch = generate_batch(config, b, vector_len);
+ let ptr = engine.encode_batch(
+ &batch,
+ config.batch_size,
+ vector_len,
+ num_qubits,
+ &config.encoding_method,
+ )?;
+ unsafe { release_dlpack(ptr) };
+ }
+
+ #[cfg(target_os = "linux")]
+ engine.synchronize()?;
+
+ let duration_sec = start.elapsed().as_secs_f64().max(1e-9);
+ let total_vectors = config.total_batches * config.batch_size;
+ let vectors_per_sec = total_vectors as f64 / duration_sec;
+ let latency_ms_per_vector = (duration_sec / total_vectors as f64) * 1000.0;
+
+ Ok(PipelineRunResult {
+ duration_sec,
+ vectors_per_sec,
+ latency_ms_per_vector,
+ })
+}
+
+/// Run latency pipeline (same as throughput; returns same stats; name for API
parity).
+pub fn run_latency_pipeline(config: &PipelineConfig) ->
Result<PipelineRunResult> {
+ run_throughput_pipeline(config)
+}
diff --git a/qdp/qdp-python/benchmark/__init__.py
b/qdp/qdp-python/benchmark/__init__.py
new file mode 100644
index 000000000..1ba132fde
--- /dev/null
+++ b/qdp/qdp-python/benchmark/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Benchmark scripts and utilities for QDP Python.
diff --git a/qdp/qdp-python/benchmark/api.py b/qdp/qdp-python/benchmark/api.py
new file mode 100644
index 000000000..b2cbda019
--- /dev/null
+++ b/qdp/qdp-python/benchmark/api.py
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Re-export benchmark API from qumat_qdp. Prefer: from qumat_qdp import
QdpBenchmark, ThroughputResult, LatencyResult."""
+
+from __future__ import annotations
+
+from qumat_qdp import LatencyResult, QdpBenchmark, ThroughputResult
+
+__all__ = ["LatencyResult", "QdpBenchmark", "ThroughputResult"]
diff --git a/qdp/qdp-python/benchmark/benchmark_latency.py
b/qdp/qdp-python/benchmark/benchmark_latency.py
index 6e692b63b..c22ad774e 100644
--- a/qdp/qdp-python/benchmark/benchmark_latency.py
+++ b/qdp/qdp-python/benchmark/benchmark_latency.py
@@ -18,8 +18,8 @@
"""
Data-to-State latency benchmark: CPU RAM -> GPU VRAM.
-Run:
- python qdp/qdp-python/benchmark/benchmark_latency.py --qubits 16 \
+Run from qdp-python directory (qumat_qdp must be importable, e.g. via uv):
+ uv run python benchmark/benchmark_latency.py --qubits 16 \\
--batches 200 --batch-size 64 --prefetch 16
"""
@@ -30,8 +30,8 @@ import time
import torch
-from _qdp import QdpEngine
-from utils import normalize_batch, prefetched_batches
+from benchmark.utils import normalize_batch, prefetched_batches
+from qumat_qdp import QdpBenchmark
BAR = "=" * 70
SEP = "-" * 70
@@ -92,30 +92,25 @@ def run_mahout(
prefetch: int,
encoding_method: str = "amplitude",
):
+ """Run Mahout latency using the generic user API (QdpBenchmark)."""
try:
- engine = QdpEngine(0)
+ result = (
+ QdpBenchmark(device_id=0)
+ .qubits(num_qubits)
+ .encoding(encoding_method)
+ .batches(total_batches, size=batch_size)
+ .prefetch(prefetch)
+ .run_latency()
+ )
except Exception as exc:
print(f"[Mahout] Init failed: {exc}")
return 0.0, 0.0
- vector_len = num_qubits if encoding_method == "angle" else (1 <<
num_qubits)
- sync_cuda()
- start = time.perf_counter()
- processed = 0
-
- for batch in prefetched_batches(
- total_batches, batch_size, vector_len, prefetch, encoding_method
- ):
- normalized = normalize_batch(batch, encoding_method)
- qtensor = engine.encode(normalized, num_qubits, encoding_method)
- _ = torch.utils.dlpack.from_dlpack(qtensor)
- processed += normalized.shape[0]
-
- sync_cuda()
- duration = time.perf_counter() - start
- latency_ms = (duration / processed) * 1000 if processed > 0 else 0.0
- print(f" Total Time: {duration:.4f} s ({latency_ms:.3f} ms/vector)")
- return duration, latency_ms
+ print(
+ f" Total Time: {result.duration_sec:.4f} s "
+ f"({result.latency_ms_per_vector:.3f} ms/vector)"
+ )
+ return result.duration_sec, result.latency_ms_per_vector
def run_pennylane(num_qubits: int, total_batches: int, batch_size: int,
prefetch: int):
diff --git a/qdp/qdp-python/benchmark/benchmark_loader_throughput.py
b/qdp/qdp-python/benchmark/benchmark_loader_throughput.py
new file mode 100644
index 000000000..223e68555
--- /dev/null
+++ b/qdp/qdp-python/benchmark/benchmark_loader_throughput.py
@@ -0,0 +1,106 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Throughput benchmark using QuantumDataLoader (for qt in loader).
+
+Compares iterator-based throughput with run_throughput_pipeline_py.
+Expectation: loader version slightly slower due to Python boundary per batch.
+
+Run from qdp-python directory (qumat_qdp must be importable, e.g. via uv):
+ uv run python benchmark/benchmark_loader_throughput.py --qubits 16 --batches
200 --batch-size 64
+"""
+
+from __future__ import annotations
+
+import argparse
+import time
+
+from qumat_qdp import QuantumDataLoader, QdpBenchmark
+
+
+def run_loader_throughput(
+ num_qubits: int,
+ total_batches: int,
+ batch_size: int,
+ encoding_method: str = "amplitude",
+) -> tuple[float, float]:
+ """Run throughput by iterating QuantumDataLoader; returns (duration_sec,
vectors_per_sec)."""
+ loader = (
+ QuantumDataLoader(device_id=0)
+ .qubits(num_qubits)
+ .encoding(encoding_method)
+ .batches(total_batches, size=batch_size)
+ .source_synthetic()
+ )
+ total_vectors = total_batches * batch_size
+ start = time.perf_counter()
+ count = 0
+ for qt in loader:
+ count += 1
+ # Consumer: touch tensor (e.g. could torch.from_dlpack(qt) and use it)
+ _ = qt
+ elapsed = time.perf_counter() - start
+ if count != total_batches:
+ raise RuntimeError(f"Expected {total_batches} batches, got {count}")
+ duration_sec = max(elapsed, 1e-9)
+ vectors_per_sec = total_vectors / duration_sec
+ return duration_sec, vectors_per_sec
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(
+ description="QuantumDataLoader throughput benchmark"
+ )
+ parser.add_argument("--qubits", type=int, default=16)
+ parser.add_argument("--batches", type=int, default=200)
+ parser.add_argument("--batch-size", type=int, default=64)
+ parser.add_argument("--encoding", type=str, default="amplitude")
+ parser.add_argument("--trials", type=int, default=3)
+ args = parser.parse_args()
+
+ print("QuantumDataLoader throughput (for qt in loader)")
+ print(
+ f" qubits={args.qubits}, batches={args.batches},
batch_size={args.batch_size}"
+ )
+
+ loader_times: list[float] = []
+ for t in range(args.trials):
+ dur, vps = run_loader_throughput(
+ args.qubits, args.batches, args.batch_size, args.encoding
+ )
+ loader_times.append(vps)
+ print(f" Trial {t + 1}: {dur:.4f} s, {vps:.1f} vec/s")
+
+ median_vps = sorted(loader_times)[len(loader_times) // 2]
+ print(f" Median: {median_vps:.1f} vec/s")
+
+ # Compare with full Rust pipeline (single boundary)
+ print("\nQdpBenchmark.run_throughput() (full Rust pipeline, single
boundary):")
+ result = (
+ QdpBenchmark(device_id=0)
+ .qubits(args.qubits)
+ .encoding(args.encoding)
+ .batches(args.batches, size=args.batch_size)
+ .run_throughput()
+ )
+ print(f" {result.duration_sec:.4f} s, {result.vectors_per_sec:.1f} vec/s")
+ print(f"\nLoader vs pipeline ratio: {median_vps /
result.vectors_per_sec:.3f}")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/qdp/qdp-python/benchmark/benchmark_throughput.py
b/qdp/qdp-python/benchmark/benchmark_throughput.py
index 15c67646f..acb7e067d 100644
--- a/qdp/qdp-python/benchmark/benchmark_throughput.py
+++ b/qdp/qdp-python/benchmark/benchmark_throughput.py
@@ -23,8 +23,8 @@ The workload mirrors the
`qdp-core/examples/dataloader_throughput.rs` pipeline:
- Prefetch on the CPU side to keep the GPU fed.
- Encode vectors into amplitude states on GPU and run a tiny consumer op.
-Run:
- python qdp/benchmark/benchmark_throughput.py --qubits 16 --batches 200
--batch-size 64
+Run from qdp-python directory (qumat_qdp must be importable, e.g. via uv):
+ uv run python benchmark/benchmark_throughput.py --qubits 16 --batches 200
--batch-size 64
"""
import argparse
@@ -33,8 +33,8 @@ import time
import numpy as np
import torch
-from _qdp import QdpEngine
-from utils import normalize_batch, prefetched_batches
+from benchmark.utils import normalize_batch, prefetched_batches
+from qumat_qdp import QdpBenchmark
BAR = "=" * 70
SEP = "-" * 70
@@ -83,34 +83,26 @@ def run_mahout(
prefetch: int,
encoding_method: str = "amplitude",
):
+ """Run Mahout throughput using the generic user API (QdpBenchmark)."""
try:
- engine = QdpEngine(0)
+ result = (
+ QdpBenchmark(device_id=0)
+ .qubits(num_qubits)
+ .encoding(encoding_method)
+ .batches(total_batches, size=batch_size)
+ .prefetch(prefetch)
+ .run_throughput()
+ )
except Exception as exc:
print(f"[Mahout] Init failed: {exc}")
return 0.0, 0.0
- torch.cuda.synchronize()
- start = time.perf_counter()
-
- vector_len = num_qubits if encoding_method == "angle" else (1 <<
num_qubits)
- processed = 0
- for batch in prefetched_batches(
- total_batches, batch_size, vector_len, prefetch, encoding_method
- ):
- normalized = np.ascontiguousarray(
- normalize_batch(batch, encoding_method), dtype=np.float64
- )
- qtensor = engine.encode(normalized, num_qubits, encoding_method)
- tensor = torch.from_dlpack(qtensor).abs().to(torch.float32)
- _ = tensor.sum()
- processed += normalized.shape[0]
-
- torch.cuda.synchronize()
- duration = time.perf_counter() - start
- throughput = processed / duration if duration > 0 else 0.0
- print(f" IO + Encode Time: {duration:.4f} s")
- print(f" Total Time: {duration:.4f} s ({throughput:.1f} vectors/sec)")
- return duration, throughput
+ print(f" IO + Encode Time: {result.duration_sec:.4f} s")
+ print(
+ f" Total Time: {result.duration_sec:.4f} s "
+ f"({result.vectors_per_sec:.1f} vectors/sec)"
+ )
+ return result.duration_sec, result.vectors_per_sec
def run_pennylane(num_qubits: int, total_batches: int, batch_size: int,
prefetch: int):
diff --git a/qdp/qdp-python/benchmark/loader.py
b/qdp/qdp-python/benchmark/loader.py
new file mode 100644
index 000000000..0b6988eed
--- /dev/null
+++ b/qdp/qdp-python/benchmark/loader.py
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Re-export QuantumDataLoader from qumat_qdp. Prefer: from qumat_qdp import
QuantumDataLoader."""
+
+from __future__ import annotations
+
+from qumat_qdp import QuantumDataLoader
+
+__all__ = ["QuantumDataLoader"]
diff --git a/qdp/qdp-python/benchmark/run_pipeline_baseline.py
b/qdp/qdp-python/benchmark/run_pipeline_baseline.py
index c813c8333..e6466c901 100644
--- a/qdp/qdp-python/benchmark/run_pipeline_baseline.py
+++ b/qdp/qdp-python/benchmark/run_pipeline_baseline.py
@@ -22,14 +22,26 @@ Runs throughput and latency benchmarks multiple times
(default 5), computes
median/p95, gathers system metadata, and writes CSV + markdown report to
qdp/docs/optimization/results/.
+Uses the Rust-optimized pipeline only (qumat_qdp.QdpBenchmark ->
_qdp.run_throughput_pipeline_py).
+No Python for loop; all scheduling is in Rust.
+
Set observability before running (recommended):
export QDP_ENABLE_POOL_METRICS=1
export QDP_ENABLE_OVERLAP_TRACKING=1
export RUST_LOG=info
-Usage:
- cd qdp/qdp-python/benchmark
- uv run python run_pipeline_baseline.py --qubits 16 --batch-size 64
--prefetch 16 --batches 500 --trials 20
+Usage (from qdp-python):
+ cd qdp/qdp-python
+ uv run python benchmark/run_pipeline_baseline.py --qubits 16 --batch-size 64
--batches 500 --trials 20
+
+If you see "run_throughput_pipeline_py is missing", uv is using a cached
wheel. Force a rebuild:
+ uv sync --refresh-package qumat-qdp
+ uv run python benchmark/run_pipeline_baseline.py ...
+
+Alternatively build and run without uv run:
+ maturin develop
+ .venv/bin/python benchmark/run_pipeline_baseline.py ...
+ # or: ./benchmark/run_baseline.sh ...
"""
from __future__ import annotations
@@ -48,12 +60,16 @@ os.environ.setdefault("QDP_ENABLE_POOL_METRICS", "1")
os.environ.setdefault("QDP_ENABLE_OVERLAP_TRACKING", "1")
os.environ.setdefault("RUST_LOG", "info")
-from benchmark_latency import run_mahout as run_mahout_latency
-from benchmark_throughput import run_mahout as run_mahout_throughput
+# Add project root to path so qumat_qdp is importable when run as script
+_benchmark_dir = Path(__file__).resolve().parent
+_project_root = _benchmark_dir.parent
+if str(_project_root) not in sys.path:
+ sys.path.insert(0, str(_project_root))
+
+from qumat_qdp import QdpBenchmark # noqa: E402
def _repo_root() -> Path:
- # benchmark -> qdp-python -> qdp -> mahout (workspace root)
return Path(__file__).resolve().parent.parent.parent.parent
@@ -116,13 +132,19 @@ def run_throughput_trials(
trials: int,
encoding: str,
) -> list[float]:
+ """Run throughput trials using the generic user API (QdpBenchmark)."""
throughputs: list[float] = []
- for i in range(trials):
- _duration, throughput = run_mahout_throughput(
- qubits, batches, batch_size, prefetch, encoding
+ for _ in range(trials):
+ result = (
+ QdpBenchmark(device_id=0)
+ .qubits(qubits)
+ .encoding(encoding)
+ .batches(batches, size=batch_size)
+ .prefetch(prefetch)
+ .run_throughput()
)
- if throughput > 0:
- throughputs.append(throughput)
+ if result.vectors_per_sec > 0:
+ throughputs.append(result.vectors_per_sec)
return throughputs
@@ -134,13 +156,19 @@ def run_latency_trials(
trials: int,
encoding: str,
) -> list[float]:
+ """Run latency trials using the generic user API (QdpBenchmark)."""
latencies_ms: list[float] = []
- for i in range(trials):
- _duration, latency_ms = run_mahout_latency(
- qubits, batches, batch_size, prefetch, encoding
+ for _ in range(trials):
+ result = (
+ QdpBenchmark(device_id=0)
+ .qubits(qubits)
+ .encoding(encoding)
+ .batches(batches, size=batch_size)
+ .prefetch(prefetch)
+ .run_latency()
)
- if latency_ms > 0:
- latencies_ms.append(latency_ms)
+ if result.latency_ms_per_vector > 0:
+ latencies_ms.append(result.latency_ms_per_vector)
return latencies_ms
diff --git a/qdp/qdp-python/pyproject.toml b/qdp/qdp-python/pyproject.toml
index 72f663a5a..f7fd659d6 100644
--- a/qdp/qdp-python/pyproject.toml
+++ b/qdp/qdp-python/pyproject.toml
@@ -38,5 +38,16 @@ name = "pytorch"
url = "https://download.pytorch.org/whl/cu122"
explicit = true
+# Invalidate uv cache when Rust or Cargo changes so extension is rebuilt
(run_throughput_pipeline_py etc.).
+# Ref: https://docs.astral.sh/uv/concepts/cache/#dynamic-metadata
+[tool.uv]
+cache-keys = [
+ { file = "pyproject.toml" },
+ { file = "Cargo.toml" },
+ { file = "src/**" },
+]
+
[tool.maturin]
module-name = "_qdp"
+# Package at project root (qumat_qdp/) so editable install and uv run find it
+python-source = "."
diff --git a/qdp/qdp-python/qumat_qdp/__init__.py
b/qdp/qdp-python/qumat_qdp/__init__.py
new file mode 100644
index 000000000..79d1cecdd
--- /dev/null
+++ b/qdp/qdp-python/qumat_qdp/__init__.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+QDP (Quantum Data Processing) Python API.
+
+Public API: QdpEngine, QuantumTensor (Rust extension _qdp),
+QdpBenchmark, ThroughputResult, LatencyResult (benchmark API),
+QuantumDataLoader (data loader iterator).
+
+Usage:
+ from qumat_qdp import QdpEngine, QuantumTensor
+ from qumat_qdp import QdpBenchmark, ThroughputResult, LatencyResult
+ from qumat_qdp import QuantumDataLoader
+"""
+
+from __future__ import annotations
+
+# Rust extension (built by maturin). QdpEngine/QuantumTensor are public for
+# advanced use; QdpBenchmark and QuantumDataLoader are the recommended
high-level API.
+import _qdp
+
+from qumat_qdp.api import (
+ LatencyResult,
+ QdpBenchmark,
+ ThroughputResult,
+)
+from qumat_qdp.loader import QuantumDataLoader
+
+# Re-export Rust extension types
+QdpEngine = _qdp.QdpEngine
+QuantumTensor = _qdp.QuantumTensor
+run_throughput_pipeline_py = getattr(_qdp, "run_throughput_pipeline_py", None)
+
+__all__ = [
+ "LatencyResult",
+ "QdpBenchmark",
+ "QdpEngine",
+ "QuantumDataLoader",
+ "QuantumTensor",
+ "ThroughputResult",
+ "run_throughput_pipeline_py",
+]
diff --git a/qdp/qdp-python/qumat_qdp/api.py b/qdp/qdp-python/qumat_qdp/api.py
new file mode 100644
index 000000000..2fffbbc09
--- /dev/null
+++ b/qdp/qdp-python/qumat_qdp/api.py
@@ -0,0 +1,156 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Benchmark API: Rust-optimized pipeline only (no Python for loop).
+
+Usage:
+ from qumat_qdp import QdpBenchmark, ThroughputResult, LatencyResult
+
+ result = (QdpBenchmark(device_id=0).qubits(16).encoding("amplitude")
+ .batches(100, size=64).warmup(2).run_throughput())
+ # result.duration_sec, result.vectors_per_sec
+
+ lat = (QdpBenchmark(device_id=0).qubits(16).encoding("amplitude")
+ .batches(100, size=64).run_latency())
+ # lat.latency_ms_per_vector
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Optional
+
+
+@dataclass
+class ThroughputResult:
+ """Result of run_throughput(): duration and vectors per second."""
+
+ duration_sec: float
+ vectors_per_sec: float
+
+
+@dataclass
+class LatencyResult:
+ """Result of run_latency(): duration and ms per vector."""
+
+ duration_sec: float
+ latency_ms_per_vector: float
+
+
+# Cached reference to Rust pipeline (avoids repeated import).
+_run_throughput_pipeline_py: Optional[object] = None
+
+
+def _get_run_throughput_pipeline_py():
+ """Return Rust run_throughput_pipeline_py; raise if not available."""
+ global _run_throughput_pipeline_py
+ if _run_throughput_pipeline_py is not None:
+ return _run_throughput_pipeline_py
+ import _qdp
+
+ fn = getattr(_qdp, "run_throughput_pipeline_py", None)
+ if fn is None:
+ raise RuntimeError(
+ "Rust pipeline not available: _qdp.run_throughput_pipeline_py is
missing. "
+ "Force uv to rebuild the extension: from qdp-python run `uv sync
--refresh-package qumat-qdp` "
+ "then `uv run python benchmark/run_pipeline_baseline.py`. Or run
`maturin develop` and use "
+ "`.venv/bin/python` or `benchmark/run_baseline.sh`."
+ )
+ _run_throughput_pipeline_py = fn
+ return fn
+
+
+class QdpBenchmark:
+ """
+ Builder for throughput/latency benchmarks. Backend is Rust optimized
pipeline only.
+
+ No Python for loop; run_throughput_pipeline_py runs the full pipeline in
Rust
+ (single Python boundary, GIL released). Requires
_qdp.run_throughput_pipeline_py
+ (Linux/CUDA build).
+ """
+
+ def __init__(self, device_id: int = 0):
+ self._device_id = device_id
+ self._num_qubits: Optional[int] = None
+ self._encoding_method: str = "amplitude"
+ self._total_batches: Optional[int] = None
+ self._batch_size: int = 64
+ self._warmup_batches: int = 0
+
+ def qubits(self, n: int) -> "QdpBenchmark":
+ self._num_qubits = n
+ return self
+
+ def encoding(self, method: str) -> "QdpBenchmark":
+ self._encoding_method = method
+ return self
+
+ def batches(self, total: int, size: int = 64) -> "QdpBenchmark":
+ self._total_batches = total
+ self._batch_size = size
+ return self
+
+ def prefetch(self, n: int) -> "QdpBenchmark":
+ """No-op for API compatibility; Rust pipeline does not use prefetch
from Python."""
+ return self
+
+ def warmup(self, n: int) -> "QdpBenchmark":
+ self._warmup_batches = n
+ return self
+
+ def run_throughput(self) -> ThroughputResult:
+ """Run throughput via Rust optimized pipeline (no Python for loop)."""
+ if self._num_qubits is None or self._total_batches is None:
+ raise ValueError(
+ "Set qubits and batches (e.g. .qubits(16).batches(100, 64))"
+ )
+
+ run_rust = _get_run_throughput_pipeline_py()
+ duration_sec, vectors_per_sec, _ = run_rust(
+ device_id=self._device_id,
+ num_qubits=self._num_qubits,
+ batch_size=self._batch_size,
+ total_batches=self._total_batches,
+ encoding_method=self._encoding_method,
+ warmup_batches=self._warmup_batches,
+ seed=None,
+ )
+ return ThroughputResult(
+ duration_sec=duration_sec, vectors_per_sec=vectors_per_sec
+ )
+
+ def run_latency(self) -> LatencyResult:
+ """Run latency via Rust optimized pipeline (no Python for loop)."""
+ if self._num_qubits is None or self._total_batches is None:
+ raise ValueError(
+ "Set qubits and batches (e.g. .qubits(16).batches(100, 64))"
+ )
+
+ run_rust = _get_run_throughput_pipeline_py()
+ duration_sec, _, latency_ms_per_vector = run_rust(
+ device_id=self._device_id,
+ num_qubits=self._num_qubits,
+ batch_size=self._batch_size,
+ total_batches=self._total_batches,
+ encoding_method=self._encoding_method,
+ warmup_batches=self._warmup_batches,
+ seed=None,
+ )
+ return LatencyResult(
+ duration_sec=duration_sec,
+ latency_ms_per_vector=latency_ms_per_vector,
+ )
diff --git a/qdp/qdp-python/qumat_qdp/loader.py
b/qdp/qdp-python/qumat_qdp/loader.py
new file mode 100644
index 000000000..d5c37d65a
--- /dev/null
+++ b/qdp/qdp-python/qumat_qdp/loader.py
@@ -0,0 +1,202 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Quantum Data Loader: Python builder for Rust-backed batch iterator.
+
+Usage:
+ from qumat_qdp import QuantumDataLoader
+
+ loader = (QuantumDataLoader(device_id=0).qubits(16).encoding("amplitude")
+ .batches(100, size=64).source_synthetic())
+ for qt in loader:
+ batch = torch.from_dlpack(qt)
+ ...
+"""
+
+from __future__ import annotations
+
+from functools import lru_cache
+from typing import TYPE_CHECKING, Iterator, Optional
+
+if TYPE_CHECKING:
+ import _qdp # noqa: F401 -- for type checkers only
+
+# Rust interface expects seed as Option<u64>: non-negative and <= 2^64 - 1.
+# Ref: qdp-core PipelineConfig seed: Option<u64>
+_U64_MAX = 2**64 - 1
+
+# Lazy import _qdp at runtime until __iter__ is used; TYPE_CHECKING import
above
+# is for type checkers only so they can resolve "_qdp.*" annotations if needed.
+
+
+@lru_cache(maxsize=1)
+def _get_qdp():
+ import _qdp as m
+
+ return m
+
+
+def _validate_loader_args(
+ *,
+ device_id: int,
+ num_qubits: int,
+ batch_size: int,
+ total_batches: int,
+ encoding_method: str,
+ seed: Optional[int],
+) -> None:
+ """Validate arguments before passing to Rust (PipelineConfig /
create_synthetic_loader)."""
+ if device_id < 0:
+ raise ValueError(f"device_id must be non-negative, got {device_id!r}")
+ if not isinstance(num_qubits, int) or num_qubits < 1:
+ raise ValueError(f"num_qubits must be a positive integer, got
{num_qubits!r}")
+ if not isinstance(batch_size, int) or batch_size < 1:
+ raise ValueError(f"batch_size must be a positive integer, got
{batch_size!r}")
+ if not isinstance(total_batches, int) or total_batches < 1:
+ raise ValueError(
+ f"total_batches must be a positive integer, got {total_batches!r}"
+ )
+ if not encoding_method or not isinstance(encoding_method, str):
+ raise ValueError(
+ f"encoding_method must be a non-empty string, got
{encoding_method!r}"
+ )
+ if seed is not None:
+ if not isinstance(seed, int):
+ raise ValueError(
+ f"seed must be None or an integer, got {type(seed).__name__!r}"
+ )
+ if seed < 0 or seed > _U64_MAX:
+ raise ValueError(
+ f"seed must be in range [0, {_U64_MAX}] (Rust u64), got
{seed!r}"
+ )
+
+
+class QuantumDataLoader:
+ """
+ Builder for a synthetic-data quantum encoding iterator.
+
+ Yields one QuantumTensor (batch) per iteration. All encoding runs in Rust;
+ __iter__ returns the Rust-backed iterator from create_synthetic_loader.
+ """
+
+ def __init__(
+ self,
+ device_id: int = 0,
+ num_qubits: int = 16,
+ batch_size: int = 64,
+ total_batches: int = 100,
+ encoding_method: str = "amplitude",
+ seed: Optional[int] = None,
+ ) -> None:
+ _validate_loader_args(
+ device_id=device_id,
+ num_qubits=num_qubits,
+ batch_size=batch_size,
+ total_batches=total_batches,
+ encoding_method=encoding_method,
+ seed=seed,
+ )
+ self._device_id = device_id
+ self._num_qubits = num_qubits
+ self._batch_size = batch_size
+ self._total_batches = total_batches
+ self._encoding_method = encoding_method
+ self._seed = seed
+
+ def qubits(self, n: int) -> QuantumDataLoader:
+ """Set number of qubits. Returns self for chaining."""
+ if not isinstance(n, int) or n < 1:
+ raise ValueError(f"num_qubits must be a positive integer, got
{n!r}")
+ self._num_qubits = n
+ return self
+
+ def encoding(self, method: str) -> QuantumDataLoader:
+ """Set encoding method (e.g. 'amplitude', 'angle', 'basis'). Returns
self."""
+ if not method or not isinstance(method, str):
+ raise ValueError(
+ f"encoding_method must be a non-empty string, got {method!r}"
+ )
+ self._encoding_method = method
+ return self
+
+ def batches(self, total: int, size: int = 64) -> QuantumDataLoader:
+ """Set total number of batches and batch size. Returns self."""
+ if not isinstance(total, int) or total < 1:
+ raise ValueError(f"total_batches must be a positive integer, got
{total!r}")
+ if not isinstance(size, int) or size < 1:
+ raise ValueError(f"batch_size must be a positive integer, got
{size!r}")
+ self._total_batches = total
+ self._batch_size = size
+ return self
+
+ def source_synthetic(
+ self,
+ total_batches: Optional[int] = None,
+ ) -> QuantumDataLoader:
+ """Use synthetic data source (default). Optionally override
total_batches. Returns self."""
+ if total_batches is not None:
+ if not isinstance(total_batches, int) or total_batches < 1:
+ raise ValueError(
+ f"total_batches must be a positive integer, got
{total_batches!r}"
+ )
+ self._total_batches = total_batches
+ return self
+
+ def seed(self, s: Optional[int] = None) -> QuantumDataLoader:
+ """Set RNG seed for reproducible synthetic data (must fit Rust u64: 0
<= seed <= 2^64-1). Returns self."""
+ if s is not None:
+ if not isinstance(s, int):
+ raise ValueError(
+ f"seed must be None or an integer, got
{type(s).__name__!r}"
+ )
+ if s < 0 or s > _U64_MAX:
+ raise ValueError(
+ f"seed must be in range [0, {_U64_MAX}] (Rust u64), got
{s!r}"
+ )
+ self._seed = s
+ return self
+
+ def __iter__(self) -> Iterator[object]:
+ """Return Rust-backed iterator that yields one QuantumTensor per
batch."""
+ _validate_loader_args(
+ device_id=self._device_id,
+ num_qubits=self._num_qubits,
+ batch_size=self._batch_size,
+ total_batches=self._total_batches,
+ encoding_method=self._encoding_method,
+ seed=self._seed,
+ )
+ qdp = _get_qdp()
+ QdpEngine = getattr(qdp, "QdpEngine", None)
+ if QdpEngine is None:
+ raise RuntimeError(
+ "_qdp.QdpEngine not found. Build the extension with maturin
develop."
+ )
+ engine = QdpEngine(device_id=self._device_id)
+ create_synthetic_loader = getattr(engine, "create_synthetic_loader",
None)
+ if create_synthetic_loader is None:
+ raise RuntimeError(
+ "create_synthetic_loader not available (e.g. only on Linux
with CUDA)."
+ )
+ loader = create_synthetic_loader(
+ total_batches=self._total_batches,
+ batch_size=self._batch_size,
+ num_qubits=self._num_qubits,
+ encoding_method=self._encoding_method,
+ seed=self._seed,
+ )
+ return iter(loader)
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index 69743a862..ff1083f97 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -15,13 +15,21 @@
// limitations under the License.
use numpy::{PyReadonlyArray1, PyReadonlyArray2, PyUntypedArrayMethods};
-use pyo3::exceptions::PyRuntimeError;
+use pyo3::exceptions::{PyRuntimeError, PyStopIteration};
use pyo3::ffi;
use pyo3::prelude::*;
use qdp_core::dlpack::{DL_FLOAT, DLDeviceType, DLManagedTensor};
use qdp_core::{Precision, QdpEngine as CoreEngine};
use std::ffi::c_void;
+#[cfg(target_os = "linux")]
+use qdp_core::{PipelineConfig, PipelineIterator, PipelineRunResult,
run_throughput_pipeline};
+
+/// Wraps raw DLPack pointer so it can cross `py.allow_threads` (closure
return must be `Send`).
+/// Safe: DLPack pointer handover across contexts; GIL is released only during
the closure.
+struct SendPtr(pub *mut DLManagedTensor);
+unsafe impl Send for SendPtr {}
+
/// Quantum tensor wrapper implementing DLPack protocol
///
/// This class wraps a GPU-allocated quantum state vector and implements
@@ -112,9 +120,7 @@ impl QuantumTensor {
unsafe {
let tensor = &(*self.ptr).dl_tensor;
- // device_type is an enum, convert to integer
- // kDLCUDA = 2, kDLCPU = 1
- // Ref:
https://github.com/dmlc/dlpack/blob/6ea9b3eb64c881f614cd4537f95f0e125a35555c/include/dlpack/dlpack.h#L76-L80
+ // DLPack device_type: kDLCUDA = 2, kDLCPU = 1
let device_type = match tensor.device.device_type {
qdp_core::dlpack::DLDeviceType::kDLCUDA => 2,
qdp_core::dlpack::DLDeviceType::kDLCPU => 1,
@@ -151,6 +157,74 @@ impl Drop for QuantumTensor {
unsafe impl Send for QuantumTensor {}
unsafe impl Sync for QuantumTensor {}
+/// Python iterator yielding one QuantumTensor (batch) per __next__. Releases
GIL during next_batch().
+#[cfg(target_os = "linux")]
+#[pyclass]
+struct PyQuantumLoader {
+ inner: Option<PipelineIterator>,
+}
+
+#[cfg(target_os = "linux")]
+#[pymethods]
+impl PyQuantumLoader {
+ fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+ slf
+ }
+
+ /// Returns the next batch as QuantumTensor; raises StopIteration when
exhausted. Releases GIL during encode.
+ fn __next__(mut slf: PyRefMut<'_, Self>, py: Python<'_>) ->
PyResult<QuantumTensor> {
+ let mut inner_iter = match slf.inner.take() {
+ Some(it) => it,
+ None => return Err(PyStopIteration::new_err("loader exhausted")),
+ };
+
+ #[allow(deprecated)]
+ let result = py.allow_threads(move || {
+ let res = inner_iter.next_batch();
+ match res {
+ Ok(Some(ptr)) => Ok((inner_iter, Some(SendPtr(ptr)))),
+ Ok(None) => Ok((inner_iter, None)),
+ Err(e) => Err((inner_iter, e)),
+ }
+ });
+
+ match result {
+ Ok((returned_iter, Some(send_ptr))) => {
+ slf.inner = Some(returned_iter);
+ Ok(QuantumTensor {
+ ptr: send_ptr.0,
+ consumed: false,
+ })
+ }
+ Ok((_, None)) => Err(PyStopIteration::new_err("loader exhausted")),
+ Err((returned_iter, e)) => {
+ slf.inner = Some(returned_iter);
+ Err(PyRuntimeError::new_err(e.to_string()))
+ }
+ }
+ }
+}
+
+/// Stub PyQuantumLoader when not on Linux (CUDA pipeline not available).
+#[cfg(not(target_os = "linux"))]
+#[pyclass]
+struct PyQuantumLoader {}
+
+#[cfg(not(target_os = "linux"))]
+#[pymethods]
+impl PyQuantumLoader {
+ fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+ slf
+ }
+
+ fn __next__(&self, _py: Python<'_>) -> PyResult<QuantumTensor> {
+ Err(PyRuntimeError::new_err(
+ "QuantumDataLoader is only available on Linux (CUDA pipeline). \
+ Build and run from a Linux host with CUDA.",
+ ))
+ }
+}
+
/// Helper to detect PyTorch tensor
fn is_pytorch_tensor(obj: &Bound<'_, PyAny>) -> PyResult<bool> {
let type_obj = obj.get_type();
@@ -266,12 +340,8 @@ fn get_torch_cuda_stream_ptr(tensor: &Bound<'_, PyAny>) ->
PyResult<*mut c_void>
)));
}
+ // PyTorch default stream can report cuda_stream as 0; treat as valid
(Rust sync is no-op for null).
let stream_ptr: u64 = stream.getattr("cuda_stream")?.extract()?;
- if stream_ptr == 0 {
- return Err(PyRuntimeError::new_err(
- "PyTorch returned a null CUDA stream pointer",
- ));
- }
Ok(stream_ptr as *mut c_void)
}
@@ -1052,6 +1122,180 @@ impl QdpEngine {
consumed: false,
})
}
+
+ /// Create a synthetic-data loader iterator for use in Python `for qt in
loader`.
+ ///
+ /// Yields one QuantumTensor (batch) per iteration; releases GIL during
encode.
+ /// Use with QuantumDataLoader builder or directly for streaming encode.
+ ///
+ /// Args:
+ /// total_batches: Number of batches to yield
+ /// batch_size: Samples per batch
+ /// num_qubits: Qubits per sample
+ /// encoding_method: "amplitude", "angle", or "basis"
+ /// seed: Optional RNG seed for reproducible synthetic data
+ ///
+ /// Returns:
+ /// PyQuantumLoader: iterator yielding QuantumTensor per __next__
+ #[cfg(target_os = "linux")]
+ #[pyo3(signature = (total_batches, batch_size=64, num_qubits=16,
encoding_method="amplitude", seed=None))]
+ fn create_synthetic_loader(
+ &self,
+ total_batches: usize,
+ batch_size: usize,
+ num_qubits: u32,
+ encoding_method: &str,
+ seed: Option<u64>,
+ ) -> PyResult<PyQuantumLoader> {
+ let config = PipelineConfig {
+ device_id: self.engine.device().ordinal(),
+ num_qubits,
+ batch_size,
+ total_batches,
+ encoding_method: encoding_method.to_string(),
+ seed,
+ warmup_batches: 0,
+ };
+ let iter = PipelineIterator::new_synthetic(self.engine.clone(), config)
+ .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
+ Ok(PyQuantumLoader { inner: Some(iter) })
+ }
+
+ /// Stub when not on Linux: create_synthetic_loader is only implemented on
Linux.
+ #[cfg(not(target_os = "linux"))]
+ #[pyo3(signature = (total_batches, batch_size=64, num_qubits=16,
encoding_method="amplitude", seed=None))]
+ fn create_synthetic_loader(
+ &self,
+ total_batches: usize,
+ batch_size: usize,
+ num_qubits: u32,
+ encoding_method: &str,
+ seed: Option<u64>,
+ ) -> PyResult<PyQuantumLoader> {
+ let _ = (total_batches, batch_size, num_qubits, encoding_method, seed);
+ Err(PyRuntimeError::new_err(
+ "create_synthetic_loader is only available on Linux (CUDA
pipeline). \
+ Build and run from a Linux host with CUDA.",
+ ))
+ }
+
+ /// Run dual-stream pipeline for encoding (H2D + kernel overlap). Internal
API.
+ ///
+ /// Exposes run_dual_stream_pipeline from qdp-core. Accepts 1D host data
(single sample).
+ /// Does not return a tensor; use for throughput measurement or when state
is not needed.
+ /// Currently supports amplitude encoding only.
+ ///
+ /// Args:
+ /// host_data: 1D input (list or NumPy array, float64)
+ /// num_qubits: Number of qubits
+ /// encoding_method: "amplitude" (other methods not yet supported for
this path)
+ #[cfg(target_os = "linux")]
+ fn _encode_stream_internal(
+ &self,
+ host_data: &Bound<'_, PyAny>,
+ num_qubits: usize,
+ encoding_method: &str,
+ ) -> PyResult<()> {
+ let data_slice: Vec<f64> = if
host_data.hasattr("__array_interface__")? {
+ let array_1d =
host_data.extract::<PyReadonlyArray1<f64>>().map_err(|_| {
+ PyRuntimeError::new_err("host_data must be 1D NumPy array with
dtype float64")
+ })?;
+ array_1d
+ .as_slice()
+ .map_err(|_| PyRuntimeError::new_err("NumPy array must be
contiguous (C-order)"))?
+ .to_vec()
+ } else {
+ host_data.extract::<Vec<f64>>().map_err(|_| {
+ PyRuntimeError::new_err("host_data must be 1D list/array of
float64")
+ })?
+ };
+ self.engine
+ .run_dual_stream_encode(&data_slice, num_qubits, encoding_method)
+ .map_err(|e|
PyRuntimeError::new_err(format!("run_dual_stream_encode failed: {}", e)))
+ }
+}
+
+/// Runs the full throughput pipeline in Rust with GIL released. Returns
(duration_sec, vectors_per_sec, latency_ms_per_vector).
+#[cfg(target_os = "linux")]
+#[pyfunction]
+#[pyo3(signature = (device_id=0, num_qubits=16, batch_size=64,
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None))]
+#[allow(clippy::too_many_arguments)]
+fn run_throughput_pipeline_py_impl(
+ py: Python<'_>,
+ device_id: usize,
+ num_qubits: u32,
+ batch_size: usize,
+ total_batches: usize,
+ encoding_method: &str,
+ warmup_batches: usize,
+ seed: Option<u64>,
+) -> PyResult<(f64, f64, f64)> {
+ let encoding_method = encoding_method.to_string();
+ #[allow(deprecated)]
+ let result: Result<PipelineRunResult, qdp_core::MahoutError> =
py.allow_threads(move || {
+ let config = PipelineConfig {
+ device_id,
+ num_qubits,
+ batch_size,
+ total_batches,
+ encoding_method,
+ seed,
+ warmup_batches,
+ };
+ run_throughput_pipeline(&config)
+ });
+ let res = result.map_err(|e: qdp_core::MahoutError|
PyRuntimeError::new_err(e.to_string()))?;
+ Ok((
+ res.duration_sec,
+ res.vectors_per_sec,
+ res.latency_ms_per_vector,
+ ))
+}
+
+/// Stub when not on Linux: run_throughput_pipeline_py is only implemented on
Linux (CUDA pipeline).
+#[cfg(not(target_os = "linux"))]
+#[pyfunction]
+#[pyo3(signature = (device_id=0, num_qubits=16, batch_size=64,
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None))]
+fn run_throughput_pipeline_py_impl(
+ _py: Python<'_>,
+ _device_id: usize,
+ _num_qubits: u32,
+ _batch_size: usize,
+ _total_batches: usize,
+ _encoding_method: &str,
+ _warmup_batches: usize,
+ _seed: Option<u64>,
+) -> PyResult<(f64, f64, f64)> {
+ Err(PyRuntimeError::new_err(
+ "run_throughput_pipeline_py is only available on Linux (CUDA
pipeline). \
+ Build and run from a Linux host with CUDA.",
+ ))
+}
+
+/// Public wrapper so the same name is always present in the module (import
never fails).
+#[pyfunction]
+#[pyo3(signature = (device_id=0, num_qubits=16, batch_size=64,
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None))]
+#[allow(clippy::too_many_arguments)]
+fn run_throughput_pipeline_py(
+ py: Python<'_>,
+ device_id: usize,
+ num_qubits: u32,
+ batch_size: usize,
+ total_batches: usize,
+ encoding_method: &str,
+ warmup_batches: usize,
+ seed: Option<u64>,
+) -> PyResult<(f64, f64, f64)> {
+ run_throughput_pipeline_py_impl(
+ py,
+ device_id,
+ num_qubits,
+ batch_size,
+ total_batches,
+ encoding_method,
+ warmup_batches,
+ seed,
+ )
}
/// Quantum Data Plane (QDP) Python module
@@ -1059,13 +1303,12 @@ impl QdpEngine {
/// GPU-accelerated quantum data encoding with DLPack integration.
#[pymodule]
fn _qdp(m: &Bound<'_, PyModule>) -> PyResult<()> {
- // Initialize Rust logging system - respect RUST_LOG environment variable
- // Ref: https://docs.rs/env_logger/latest/env_logger/
- // try_init() won't fail if logger is already initialized (e.g., by
another library)
- // This allows Rust log messages to be visible when RUST_LOG is set
+ // Respect RUST_LOG; try_init() is idempotent if already initialized
let _ = env_logger::Builder::from_default_env().try_init();
m.add_class::<QdpEngine>()?;
m.add_class::<QuantumTensor>()?;
+ m.add_class::<PyQuantumLoader>()?;
+ m.add_function(pyo3::wrap_pyfunction!(run_throughput_pipeline_py, m)?)?;
Ok(())
}
diff --git a/testing/qdp/test_benchmark_api.py
b/testing/qdp/test_benchmark_api.py
new file mode 100644
index 000000000..d75f3a499
--- /dev/null
+++ b/testing/qdp/test_benchmark_api.py
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for the benchmark API (QdpBenchmark, Rust pipeline only)."""
+
+from pathlib import Path
+
+import pytest
+
+# Allow importing benchmark API from qdp-python/benchmark and qumat_qdp from
qdp-python
+_sys = __import__("sys")
+_qdp_python = Path(__file__).resolve().parent.parent.parent / "qdp" /
"qdp-python"
+_bench_dir = _qdp_python / "benchmark"
+if _qdp_python.exists() and str(_qdp_python) not in _sys.path:
+ _sys.path.insert(0, str(_qdp_python))
+if _bench_dir.exists() and str(_bench_dir) not in _sys.path:
+ _sys.path.insert(0, str(_bench_dir))
+
+from .qdp_test_utils import requires_qdp # noqa: E402
+
+
+@requires_qdp
+def test_benchmark_api_import():
+ """Test that the benchmark API exports only Rust-pipeline path (no
encode_stream / Python loop)."""
+ import api
+
+ assert hasattr(api, "QdpBenchmark")
+ assert hasattr(api, "ThroughputResult")
+ assert hasattr(api, "LatencyResult")
+ # No naive Python for-loop API
+ assert not hasattr(api, "encode_stream")
+ assert not hasattr(api, "create_pipeline")
+ assert not hasattr(api, "StreamPipeline")
+ assert not hasattr(api, "PipelineConfig")
+
+
+@requires_qdp
[email protected]
+def test_qdp_benchmark_run_throughput():
+ """QdpBenchmark.run_throughput() calls Rust pipeline and returns
ThroughputResult (requires GPU)."""
+ import api
+
+ result = (
+ api.QdpBenchmark(device_id=0)
+ .qubits(2)
+ .encoding("amplitude")
+ .batches(2, size=4)
+ .prefetch(2)
+ .run_throughput()
+ )
+ assert isinstance(result, api.ThroughputResult)
+ assert result.duration_sec >= 0
+ assert result.vectors_per_sec > 0
+
+
+@requires_qdp
[email protected]
+def test_qdp_benchmark_run_latency():
+ """QdpBenchmark.run_latency() calls Rust pipeline and returns
LatencyResult (requires GPU)."""
+ import api
+
+ result = (
+ api.QdpBenchmark(device_id=0)
+ .qubits(2)
+ .encoding("amplitude")
+ .batches(2, size=4)
+ .prefetch(2)
+ .run_latency()
+ )
+ assert isinstance(result, api.LatencyResult)
+ assert result.duration_sec >= 0
+ assert result.latency_ms_per_vector > 0
+
+
+@requires_qdp
[email protected]("method", ["run_throughput", "run_latency"])
+def test_qdp_benchmark_validation(method):
+ """QdpBenchmark.run_throughput() and run_latency() raise if qubits/batches
not set."""
+ import api
+
+ bench = api.QdpBenchmark(device_id=0)
+ runner = getattr(bench, method)
+ with pytest.raises(ValueError, match="qubits and batches"):
+ runner()
+
+
+@requires_qdp
[email protected]
+def test_qdp_benchmark_device_id_propagated():
+ """QdpBenchmark(device_id=...) propagates device_id to Rust pipeline when
running."""
+ import api
+
+ # When qubits/batches are set, run_throughput uses the bench's device_id
(e.g. 0).
+ result = (
+ api.QdpBenchmark(device_id=0)
+ .qubits(2)
+ .encoding("amplitude")
+ .batches(2, size=4)
+ .run_throughput()
+ )
+ assert hasattr(result, "vectors_per_sec")
+ assert result.vectors_per_sec >= 0
diff --git a/testing/qdp/test_bindings.py b/testing/qdp/test_bindings.py
index 0e692a366..a0f043456 100644
--- a/testing/qdp/test_bindings.py
+++ b/testing/qdp/test_bindings.py
@@ -438,9 +438,9 @@ def test_encode_cuda_tensor_preserves_input(data_shape,
is_batch):
@requires_qdp
@pytest.mark.gpu
[email protected]("encoding_method", ["basis", "angle"])
[email protected]("encoding_method", ["basis"])
def test_encode_cuda_tensor_unsupported_encoding(encoding_method):
- """Test error when using CUDA tensor with unsupported encoding method."""
+ """Test error when using CUDA tensor with unsupported encoding (CUDA
supports amplitude and angle only)."""
pytest.importorskip("torch")
import torch
from _qdp import QdpEngine
@@ -454,7 +454,10 @@ def
test_encode_cuda_tensor_unsupported_encoding(encoding_method):
# Use non-zero data to avoid normalization issues
data = torch.tensor([1.0, 0.0, 0.0, 0.0], dtype=torch.float64,
device="cuda:0")
- with pytest.raises(RuntimeError, match="only supports 'amplitude' method"):
+ with pytest.raises(
+ RuntimeError,
+ match="only supports 'amplitude' and 'angle' methods.*Use
tensor.cpu\\(\\)",
+ ):
engine.encode(data, 2, encoding_method)