This is an automated email from the ASF dual-hosted git repository.
richhuang pushed a commit to branch dev-qdp
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/dev-qdp by this push:
new 167f74333 [QDP] add Rust Linter to pre-commit hook (#758)
167f74333 is described below
commit 167f74333791df6d0e765fa1b8f69242fe2f6dd2
Author: Ryan Huang <[email protected]>
AuthorDate: Mon Dec 29 10:07:44 2025 +0800
[QDP] add Rust Linter to pre-commit hook (#758)
* add rust linter
* pre-commit
Signed-off-by: Hsien-Cheng Huang <[email protected]>
---------
Signed-off-by: Hsien-Cheng Huang <[email protected]>
---
.pre-commit-config.yaml | 16 ++
qdp/qdp-core/examples/dataloader_throughput.rs | 4 +-
qdp/qdp-core/examples/nvtx_profile.rs | 5 +-
qdp/qdp-core/src/dlpack.rs | 20 ++-
qdp/qdp-core/src/gpu/encodings/amplitude.rs | 222 +++++++++++++-----------
qdp/qdp-core/src/gpu/encodings/angle.rs | 8 +-
qdp/qdp-core/src/gpu/encodings/basis.rs | 8 +-
qdp/qdp-core/src/gpu/encodings/mod.rs | 16 +-
qdp/qdp-core/src/gpu/memory.rs | 186 +++++++++++++-------
qdp/qdp-core/src/gpu/mod.rs | 4 +-
qdp/qdp-core/src/gpu/pipeline.rs | 72 ++++----
qdp/qdp-core/src/io.rs | 230 ++++++++++++-------------
qdp/qdp-core/src/lib.rs | 168 ++++++++++++------
qdp/qdp-core/src/preprocessing.rs | 58 ++++---
qdp/qdp-core/src/profiling.rs | 1 -
qdp/qdp-core/tests/api_workflow.rs | 79 +++++++--
qdp/qdp-core/tests/arrow_ipc_io.rs | 63 +++----
qdp/qdp-core/tests/memory_safety.rs | 15 +-
qdp/qdp-core/tests/parquet_io.rs | 4 +-
qdp/qdp-core/tests/preprocessing.rs | 14 +-
qdp/qdp-core/tests/validation.rs | 35 ++--
qdp/qdp-kernels/build.rs | 16 +-
qdp/qdp-kernels/src/lib.rs | 8 +-
qdp/qdp-kernels/tests/amplitude_encode.rs | 43 +++--
qdp/qdp-python/src/lib.rs | 49 ++++--
25 files changed, 818 insertions(+), 526 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index c5208b3a9..42d2e79c4 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -50,3 +50,19 @@ repos:
- testing/utils/.license-header.txt
- --comment-style
- "//"
+
+# Rust Linter
+ - repo: local
+ hooks:
+ - id: rust-fmt
+ name: rustfmt
+ entry: bash -c 'cd qdp && cargo fmt --all'
+ language: system
+ types: [rust]
+ pass_filenames: false
+ - id: rust-clippy
+ name: clippy
+ entry: cargo clippy --manifest-path qdp/Cargo.toml --all-targets
--all-features --fix --allow-dirty --allow-staged -- -D warnings
+ language: system
+ types: [rust]
+ pass_filenames: false
diff --git a/qdp/qdp-core/examples/dataloader_throughput.rs
b/qdp/qdp-core/examples/dataloader_throughput.rs
index f13d85bac..029caaa1f 100644
--- a/qdp/qdp-core/examples/dataloader_throughput.rs
+++ b/qdp/qdp-core/examples/dataloader_throughput.rs
@@ -108,9 +108,7 @@ fn main() {
Err(e) => {
eprintln!(
"Encode failed on batch {} (vector {}): {:?}",
- batch_idx,
- total_vectors,
- e
+ batch_idx, total_vectors, e
);
return;
}
diff --git a/qdp/qdp-core/examples/nvtx_profile.rs
b/qdp/qdp-core/examples/nvtx_profile.rs
index c463f3f08..3e5c0c050 100644
--- a/qdp/qdp-core/examples/nvtx_profile.rs
+++ b/qdp/qdp-core/examples/nvtx_profile.rs
@@ -14,7 +14,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-
// NVTX profiling example
// Run: cargo run -p qdp-core --example nvtx_profile --features observability
--release
@@ -74,6 +73,8 @@ fn main() {
println!("=== Test Complete ===");
println!();
println!("To view NVTX markers, use Nsight Systems:");
- println!(" nsys profile --trace=cuda,nvtx cargo run -p qdp-core --example
nvtx_profile --features observability --release");
+ println!(
+ " nsys profile --trace=cuda,nvtx cargo run -p qdp-core --example
nvtx_profile --features observability --release"
+ );
println!("Then open the generated .nsys-rep file in Nsight Systems");
}
diff --git a/qdp/qdp-core/src/dlpack.rs b/qdp/qdp-core/src/dlpack.rs
index e84630ca6..4d3ac764d 100644
--- a/qdp/qdp-core/src/dlpack.rs
+++ b/qdp/qdp-core/src/dlpack.rs
@@ -16,9 +16,9 @@
// DLPack protocol for zero-copy GPU memory sharing with PyTorch
+use crate::gpu::memory::{BufferStorage, GpuStateVector, Precision};
use std::os::raw::{c_int, c_void};
use std::sync::Arc;
-use crate::gpu::memory::{BufferStorage, GpuStateVector, Precision};
// DLPack C structures (matching dlpack/dlpack.h)
@@ -38,7 +38,7 @@ pub struct DLDevice {
#[repr(C)]
pub struct DLDataType {
- pub code: u8, // kDLInt=0, kDLUInt=1, kDLFloat=2, kDLBfloat=4,
kDLComplex=5
+ pub code: u8, // kDLInt=0, kDLUInt=1, kDLFloat=2, kDLBfloat=4, kDLComplex=5
pub bits: u8,
pub lanes: u16,
}
@@ -89,14 +89,22 @@ pub unsafe extern "C" fn dlpack_deleter(managed: *mut
DLManagedTensor) {
// 1. Free shape array (Box<[i64]>)
if !tensor.shape.is_null() {
- let len = if tensor.ndim > 0 { tensor.ndim as usize } else { 1 };
+ let len = if tensor.ndim > 0 {
+ tensor.ndim as usize
+ } else {
+ 1
+ };
let slice_ptr: *mut [i64] =
std::ptr::slice_from_raw_parts_mut(tensor.shape, len);
let _ = Box::from_raw(slice_ptr);
}
// 2. Free strides array
if !tensor.strides.is_null() {
- let len = if tensor.ndim > 0 { tensor.ndim as usize } else { 1 };
+ let len = if tensor.ndim > 0 {
+ tensor.ndim as usize
+ } else {
+ 1
+ };
let slice_ptr: *mut [i64] =
std::ptr::slice_from_raw_parts_mut(tensor.strides, len);
let _ = Box::from_raw(slice_ptr);
}
@@ -124,7 +132,7 @@ impl GpuStateVector {
let (shape, strides) = if let Some(num_samples) = self.num_samples {
// Batch: [num_samples, state_len_per_sample]
debug_assert!(
- num_samples > 0 && self.size_elements % num_samples == 0,
+ num_samples > 0 &&
self.size_elements.is_multiple_of(num_samples),
"Batch state vector size must be divisible by num_samples"
);
let state_len_per_sample = self.size_elements / num_samples;
@@ -148,7 +156,7 @@ impl GpuStateVector {
let ctx = Arc::into_raw(self.buffer.clone()) as *mut c_void;
let dtype_bits = match self.precision() {
- Precision::Float32 => 64, // complex64 (2x float32)
+ Precision::Float32 => 64, // complex64 (2x float32)
Precision::Float64 => 128, // complex128 (2x float64)
};
diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
index 70b38895e..f6a02b0db 100644
--- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
+++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
@@ -18,27 +18,24 @@
use std::sync::Arc;
-use cudarc::driver::CudaDevice;
+use super::QuantumEncoder;
use crate::error::{MahoutError, Result};
use crate::gpu::memory::GpuStateVector;
use crate::gpu::pipeline::run_dual_stream_pipeline;
-use super::QuantumEncoder;
+use cudarc::driver::CudaDevice;
-#[cfg(target_os = "linux")]
-use std::ffi::c_void;
#[cfg(target_os = "linux")]
use crate::gpu::cuda_ffi::cudaMemsetAsync;
#[cfg(target_os = "linux")]
+use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+#[cfg(target_os = "linux")]
use cudarc::driver::{DevicePtr, DevicePtrMut};
#[cfg(target_os = "linux")]
use qdp_kernels::{
- launch_amplitude_encode,
- launch_amplitude_encode_batch,
- launch_l2_norm,
- launch_l2_norm_batch,
+ launch_amplitude_encode, launch_amplitude_encode_batch, launch_l2_norm,
launch_l2_norm_batch,
};
#[cfg(target_os = "linux")]
-use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+use std::ffi::c_void;
use crate::preprocessing::Preprocessor;
@@ -75,18 +72,23 @@ impl QuantumEncoder for AmplitudeEncoder {
if host_data.len() < ASYNC_THRESHOLD {
// Synchronous path for small data (avoids stream overhead)
- let input_bytes = host_data.len() * std::mem::size_of::<f64>();
- ensure_device_memory_available(input_bytes, "input staging
buffer", Some(num_qubits))?;
+ let input_bytes = std::mem::size_of_val(host_data);
+ ensure_device_memory_available(
+ input_bytes,
+ "input staging buffer",
+ Some(num_qubits),
+ )?;
let input_slice = {
crate::profile_scope!("GPU::H2DCopy");
- _device.htod_sync_copy(host_data)
- .map_err(|e| map_allocation_error(
+ _device.htod_sync_copy(host_data).map_err(|e| {
+ map_allocation_error(
input_bytes,
"input staging buffer",
Some(num_qubits),
e,
- ))?
+ )
+ })?
};
// GPU-accelerated norm for medium+ inputs, CPU fallback for
tiny payloads
@@ -142,15 +144,22 @@ impl QuantumEncoder for AmplitudeEncoder {
{
crate::profile_scope!("GPU::Synchronize");
- _device
- .synchronize()
- .map_err(|e| MahoutError::Cuda(format!("CUDA device
synchronize failed: {:?}", e)))?;
+ _device.synchronize().map_err(|e| {
+ MahoutError::Cuda(format!("CUDA device synchronize
failed: {:?}", e))
+ })?;
}
} else {
// Async Pipeline path for large data
let norm = Preprocessor::calculate_l2_norm(host_data)?;
let inv_norm = 1.0 / norm;
- Self::encode_async_pipeline(_device, host_data, num_qubits,
state_len, inv_norm, &state_vector)?;
+ Self::encode_async_pipeline(
+ _device,
+ host_data,
+ num_qubits,
+ state_len,
+ inv_norm,
+ &state_vector,
+ )?;
}
Ok(state_vector)
@@ -158,7 +167,9 @@ impl QuantumEncoder for AmplitudeEncoder {
#[cfg(not(target_os = "linux"))]
{
- Err(MahoutError::Cuda("CUDA unavailable (non-Linux)".to_string()))
+ Err(MahoutError::Cuda(
+ "CUDA unavailable (non-Linux)".to_string(),
+ ))
}
}
@@ -188,19 +199,17 @@ impl QuantumEncoder for AmplitudeEncoder {
// Upload input data to GPU
let input_batch_gpu = {
crate::profile_scope!("GPU::H2D_InputBatch");
- device.htod_sync_copy(batch_data)
- .map_err(|e| MahoutError::MemoryAllocation(
- format!("Failed to upload batch input: {:?}", e)
- ))?
+ device.htod_sync_copy(batch_data).map_err(|e| {
+ MahoutError::MemoryAllocation(format!("Failed to upload batch
input: {:?}", e))
+ })?
};
// Compute inverse norms on GPU using warp-reduced kernel
let inv_norms_gpu = {
crate::profile_scope!("GPU::BatchNormKernel");
- let mut buffer = device.alloc_zeros::<f64>(num_samples)
- .map_err(|e| MahoutError::MemoryAllocation(
- format!("Failed to allocate norm buffer: {:?}", e)
- ))?;
+ let mut buffer =
device.alloc_zeros::<f64>(num_samples).map_err(|e| {
+ MahoutError::MemoryAllocation(format!("Failed to allocate norm
buffer: {:?}", e))
+ })?;
let ret = unsafe {
launch_l2_norm_batch(
@@ -213,9 +222,11 @@ impl QuantumEncoder for AmplitudeEncoder {
};
if ret != 0 {
- return Err(MahoutError::KernelLaunch(
- format!("Norm reduction kernel failed: {} ({})", ret,
cuda_error_to_string(ret))
- ));
+ return Err(MahoutError::KernelLaunch(format!(
+ "Norm reduction kernel failed: {} ({})",
+ ret,
+ cuda_error_to_string(ret)
+ )));
}
buffer
@@ -224,12 +235,13 @@ impl QuantumEncoder for AmplitudeEncoder {
// Validate norms on host to catch zero or NaN samples early
{
crate::profile_scope!("GPU::NormValidation");
- let host_inv_norms = device.dtoh_sync_copy(&inv_norms_gpu)
+ let host_inv_norms = device
+ .dtoh_sync_copy(&inv_norms_gpu)
.map_err(|e| MahoutError::Cuda(format!("Failed to copy norms
to host: {:?}", e)))?;
if host_inv_norms.iter().any(|v| !v.is_finite() || *v == 0.0) {
return Err(MahoutError::InvalidInput(
- "One or more samples have zero or invalid norm".to_string()
+ "One or more samples have zero or invalid
norm".to_string(),
));
}
}
@@ -237,9 +249,11 @@ impl QuantumEncoder for AmplitudeEncoder {
// Launch batch kernel
{
crate::profile_scope!("GPU::BatchKernelLaunch");
- let state_ptr = batch_state_vector.ptr_f64().ok_or_else(||
MahoutError::InvalidInput(
- "Batch state vector precision mismatch (expected float64
buffer)".to_string()
- ))?;
+ let state_ptr = batch_state_vector.ptr_f64().ok_or_else(|| {
+ MahoutError::InvalidInput(
+ "Batch state vector precision mismatch (expected float64
buffer)".to_string(),
+ )
+ })?;
let ret = unsafe {
launch_amplitude_encode_batch(
*input_batch_gpu.device_ptr() as *const f64,
@@ -253,16 +267,19 @@ impl QuantumEncoder for AmplitudeEncoder {
};
if ret != 0 {
- return Err(MahoutError::KernelLaunch(
- format!("Batch kernel launch failed: {} ({})", ret,
cuda_error_to_string(ret))
- ));
+ return Err(MahoutError::KernelLaunch(format!(
+ "Batch kernel launch failed: {} ({})",
+ ret,
+ cuda_error_to_string(ret)
+ )));
}
}
// Synchronize
{
crate::profile_scope!("GPU::Synchronize");
- device.synchronize()
+ device
+ .synchronize()
.map_err(|e| MahoutError::Cuda(format!("Sync failed: {:?}",
e)))?;
}
@@ -279,8 +296,6 @@ impl QuantumEncoder for AmplitudeEncoder {
}
impl AmplitudeEncoder {
-
-
/// Async pipeline encoding for large data
///
/// Uses the generic dual-stream pipeline infrastructure to overlap
@@ -296,52 +311,58 @@ impl AmplitudeEncoder {
inv_norm: f64,
state_vector: &GpuStateVector,
) -> Result<()> {
- let base_state_ptr = state_vector.ptr_f64().ok_or_else(||
MahoutError::InvalidInput(
- "State vector precision mismatch (expected float64
buffer)".to_string()
- ))?;
+ let base_state_ptr = state_vector.ptr_f64().ok_or_else(|| {
+ MahoutError::InvalidInput(
+ "State vector precision mismatch (expected float64
buffer)".to_string(),
+ )
+ })?;
// Use generic pipeline infrastructure
// The closure handles amplitude-specific kernel launch logic
- run_dual_stream_pipeline(device, host_data, |stream, input_ptr,
chunk_offset, chunk_len| {
- // Calculate offset pointer for state vector (type-safe pointer
arithmetic)
- // Offset is in complex numbers (CuDoubleComplex), not f64 elements
- let state_ptr_offset = unsafe {
- base_state_ptr.cast::<u8>()
- .add(chunk_offset *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
- .cast::<std::ffi::c_void>()
- };
-
- // Launch amplitude encoding kernel on the provided stream
- let ret = unsafe {
- launch_amplitude_encode(
- input_ptr,
- state_ptr_offset,
- chunk_len,
- state_len,
- inv_norm,
- stream.stream as *mut c_void,
- )
- };
+ run_dual_stream_pipeline(
+ device,
+ host_data,
+ |stream, input_ptr, chunk_offset, chunk_len| {
+ // Calculate offset pointer for state vector (type-safe
pointer arithmetic)
+ // Offset is in complex numbers (CuDoubleComplex), not f64
elements
+ let state_ptr_offset = unsafe {
+ base_state_ptr
+ .cast::<u8>()
+ .add(chunk_offset *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+ .cast::<std::ffi::c_void>()
+ };
- if ret != 0 {
- let error_msg = if ret == 2 {
- format!(
- "Kernel launch reported cudaErrorMemoryAllocation
(likely OOM) while encoding chunk starting at offset {} (len={}).",
- chunk_offset,
- chunk_len
- )
- } else {
- format!(
- "Kernel launch failed with CUDA error code: {} ({})",
- ret,
- cuda_error_to_string(ret)
+ // Launch amplitude encoding kernel on the provided stream
+ let ret = unsafe {
+ launch_amplitude_encode(
+ input_ptr,
+ state_ptr_offset,
+ chunk_len,
+ state_len,
+ inv_norm,
+ stream.stream as *mut c_void,
)
};
- return Err(MahoutError::KernelLaunch(error_msg));
- }
- Ok(())
- })?;
+ if ret != 0 {
+ let error_msg = if ret == 2 {
+ format!(
+ "Kernel launch reported cudaErrorMemoryAllocation
(likely OOM) while encoding chunk starting at offset {} (len={}).",
+ chunk_offset, chunk_len
+ )
+ } else {
+ format!(
+ "Kernel launch failed with CUDA error code: {}
({})",
+ ret,
+ cuda_error_to_string(ret)
+ )
+ };
+ return Err(MahoutError::KernelLaunch(error_msg));
+ }
+
+ Ok(())
+ },
+ )?;
// CRITICAL FIX: Handle padding for uninitialized memory
// Since we use alloc() (uninitialized), we must zero-fill any tail
region
@@ -351,12 +372,11 @@ impl AmplitudeEncoder {
if data_len < state_len {
let padding_start = data_len;
let padding_elements = state_len - padding_start;
- let padding_bytes = padding_elements *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>();
+ let padding_bytes =
+ padding_elements *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>();
// Calculate tail pointer (in complex numbers)
- let tail_ptr = unsafe {
- base_state_ptr.add(padding_start) as *mut c_void
- };
+ let tail_ptr = unsafe { base_state_ptr.add(padding_start) as *mut
c_void };
// Zero-fill padding region using CUDA Runtime API
// Use default stream since pipeline streams are already
synchronized
@@ -369,15 +389,17 @@ impl AmplitudeEncoder {
);
if result != 0 {
- return Err(MahoutError::Cuda(
- format!("Failed to zero-fill padding region: {} ({})",
- result, cuda_error_to_string(result))
- ));
+ return Err(MahoutError::Cuda(format!(
+ "Failed to zero-fill padding region: {} ({})",
+ result,
+ cuda_error_to_string(result)
+ )));
}
}
// Synchronize to ensure padding is complete before returning
- device.synchronize()
+ device
+ .synchronize()
.map_err(|e| MahoutError::Cuda(format!("Failed to sync after
padding: {:?}", e)))?;
}
@@ -395,10 +417,9 @@ impl AmplitudeEncoder {
) -> Result<f64> {
crate::profile_scope!("GPU::NormSingle");
- let mut norm_buffer = device.alloc_zeros::<f64>(1)
- .map_err(|e| MahoutError::MemoryAllocation(
- format!("Failed to allocate norm buffer: {:?}", e)
- ))?;
+ let mut norm_buffer = device.alloc_zeros::<f64>(1).map_err(|e| {
+ MahoutError::MemoryAllocation(format!("Failed to allocate norm
buffer: {:?}", e))
+ })?;
let ret = unsafe {
launch_l2_norm(
@@ -410,18 +431,21 @@ impl AmplitudeEncoder {
};
if ret != 0 {
- return Err(MahoutError::KernelLaunch(
- format!("Norm kernel failed: {} ({})", ret,
cuda_error_to_string(ret))
- ));
+ return Err(MahoutError::KernelLaunch(format!(
+ "Norm kernel failed: {} ({})",
+ ret,
+ cuda_error_to_string(ret)
+ )));
}
- let inv_norm_host = device.dtoh_sync_copy(&norm_buffer)
+ let inv_norm_host = device
+ .dtoh_sync_copy(&norm_buffer)
.map_err(|e| MahoutError::Cuda(format!("Failed to copy norm to
host: {:?}", e)))?;
- let inv_norm = inv_norm_host.get(0).copied().unwrap_or(0.0);
+ let inv_norm = inv_norm_host.first().copied().unwrap_or(0.0);
if inv_norm == 0.0 || !inv_norm.is_finite() {
return Err(MahoutError::InvalidInput(
- "Input data has zero norm".to_string()
+ "Input data has zero norm".to_string(),
));
}
diff --git a/qdp/qdp-core/src/gpu/encodings/angle.rs
b/qdp/qdp-core/src/gpu/encodings/angle.rs
index 0c2ed8c01..d35dfec54 100644
--- a/qdp/qdp-core/src/gpu/encodings/angle.rs
+++ b/qdp/qdp-core/src/gpu/encodings/angle.rs
@@ -17,11 +17,11 @@
// Angle encoding (placeholder)
// TODO: Rotation-based encoding via tensor product
-use std::sync::Arc;
-use cudarc::driver::CudaDevice;
+use super::QuantumEncoder;
use crate::error::{MahoutError, Result};
use crate::gpu::memory::GpuStateVector;
-use super::QuantumEncoder;
+use cudarc::driver::CudaDevice;
+use std::sync::Arc;
/// Angle encoding (not implemented)
/// TODO: Use sin/cos for rotation-based states
@@ -36,7 +36,7 @@ impl QuantumEncoder for AngleEncoder {
) -> Result<GpuStateVector> {
self.validate_input(data, num_qubits)?;
Err(MahoutError::InvalidInput(
- "Angle encoding not yet implemented. Use 'amplitude' encoding for
now.".to_string()
+ "Angle encoding not yet implemented. Use 'amplitude' encoding for
now.".to_string(),
))
}
diff --git a/qdp/qdp-core/src/gpu/encodings/basis.rs
b/qdp/qdp-core/src/gpu/encodings/basis.rs
index 9fcf83034..fec482174 100644
--- a/qdp/qdp-core/src/gpu/encodings/basis.rs
+++ b/qdp/qdp-core/src/gpu/encodings/basis.rs
@@ -17,11 +17,11 @@
// Basis encoding (placeholder)
// TODO: Map integers to computational basis states
-use std::sync::Arc;
-use cudarc::driver::CudaDevice;
+use super::QuantumEncoder;
use crate::error::{MahoutError, Result};
use crate::gpu::memory::GpuStateVector;
-use super::QuantumEncoder;
+use cudarc::driver::CudaDevice;
+use std::sync::Arc;
/// Basis encoding (not implemented)
/// TODO: Map integers to basis states (e.g., 3 → |011⟩)
@@ -35,7 +35,7 @@ impl QuantumEncoder for BasisEncoder {
_num_qubits: usize,
) -> Result<GpuStateVector> {
Err(MahoutError::InvalidInput(
- "Basis encoding not yet implemented. Use 'amplitude' encoding for
now.".to_string()
+ "Basis encoding not yet implemented. Use 'amplitude' encoding for
now.".to_string(),
))
}
diff --git a/qdp/qdp-core/src/gpu/encodings/mod.rs
b/qdp/qdp-core/src/gpu/encodings/mod.rs
index 7273179d0..295e09503 100644
--- a/qdp/qdp-core/src/gpu/encodings/mod.rs
+++ b/qdp/qdp-core/src/gpu/encodings/mod.rs
@@ -18,10 +18,10 @@
use std::sync::Arc;
-use cudarc::driver::CudaDevice;
use crate::error::Result;
use crate::gpu::memory::GpuStateVector;
use crate::preprocessing::Preprocessor;
+use cudarc::driver::CudaDevice;
/// Quantum encoding strategy interface
/// Implemented by: AmplitudeEncoder, AngleEncoder, BasisEncoder
@@ -43,9 +43,10 @@ pub trait QuantumEncoder: Send + Sync {
_sample_size: usize,
_num_qubits: usize,
) -> Result<GpuStateVector> {
- Err(crate::error::MahoutError::NotImplemented(
- format!("Batch encoding not implemented for {}", self.name())
- ))
+ Err(crate::error::MahoutError::NotImplemented(format!(
+ "Batch encoding not implemented for {}",
+ self.name()
+ )))
}
/// Validate input data before encoding
@@ -75,8 +76,9 @@ pub fn get_encoder(name: &str) -> Result<Box<dyn
QuantumEncoder>> {
"amplitude" => Ok(Box::new(AmplitudeEncoder)),
"angle" => Ok(Box::new(AngleEncoder)),
"basis" => Ok(Box::new(BasisEncoder)),
- _ => Err(crate::error::MahoutError::InvalidInput(
- format!("Unknown encoder: {}. Available: amplitude, angle, basis",
name)
- )),
+ _ => Err(crate::error::MahoutError::InvalidInput(format!(
+ "Unknown encoder: {}. Available: amplitude, angle, basis",
+ name
+ ))),
}
}
diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs
index 97e3d9cbf..5944c8671 100644
--- a/qdp/qdp-core/src/gpu/memory.rs
+++ b/qdp/qdp-core/src/gpu/memory.rs
@@ -13,11 +13,11 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-use std::ffi::c_void;
-use std::sync::Arc;
+use crate::error::{MahoutError, Result};
use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr};
use qdp_kernels::{CuComplex, CuDoubleComplex};
-use crate::error::{MahoutError, Result};
+use std::ffi::c_void;
+use std::sync::Arc;
/// Precision of the GPU state vector.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -50,7 +50,10 @@ fn query_cuda_mem_info() -> Result<(usize, usize)> {
unsafe {
let mut free_bytes: usize = 0;
let mut total_bytes: usize = 0;
- let result = cudaMemGetInfo(&mut free_bytes as *mut usize, &mut
total_bytes as *mut usize);
+ let result = cudaMemGetInfo(
+ &mut free_bytes as *mut usize,
+ &mut total_bytes as *mut usize,
+ );
if result != 0 {
return Err(MahoutError::Cuda(format!(
@@ -65,7 +68,13 @@ fn query_cuda_mem_info() -> Result<(usize, usize)> {
}
#[cfg(target_os = "linux")]
-fn build_oom_message(context: &str, requested_bytes: usize, qubits:
Option<usize>, free: usize, total: usize) -> String {
+fn build_oom_message(
+ context: &str,
+ requested_bytes: usize,
+ qubits: Option<usize>,
+ free: usize,
+ total: usize,
+) -> String {
let qubit_hint = qubits
.map(|q| format!(" (qubits={})", q))
.unwrap_or_default();
@@ -83,7 +92,11 @@ fn build_oom_message(context: &str, requested_bytes: usize,
qubits: Option<usize
/// Returns a MemoryAllocation error with a helpful message when the request
/// exceeds the currently reported free memory.
#[cfg(target_os = "linux")]
-pub(crate) fn ensure_device_memory_available(requested_bytes: usize, context:
&str, qubits: Option<usize>) -> Result<()> {
+pub(crate) fn ensure_device_memory_available(
+ requested_bytes: usize,
+ context: &str,
+ qubits: Option<usize>,
+) -> Result<()> {
let (free, total) = query_cuda_mem_info()?;
if (requested_bytes as u64) > (free as u64) {
@@ -209,24 +222,32 @@ impl GpuStateVector {
{
let requested_bytes = _size_elements
.checked_mul(std::mem::size_of::<CuDoubleComplex>())
- .ok_or_else(|| MahoutError::MemoryAllocation(
- format!("Requested GPU allocation size overflow
(elements={})", _size_elements)
- ))?;
+ .ok_or_else(|| {
+ MahoutError::MemoryAllocation(format!(
+ "Requested GPU allocation size overflow (elements={})",
+ _size_elements
+ ))
+ })?;
// Pre-flight check to gracefully fail before cudaMalloc when OOM
is obvious
- ensure_device_memory_available(requested_bytes, "state vector
allocation", Some(qubits))?;
+ ensure_device_memory_available(
+ requested_bytes,
+ "state vector allocation",
+ Some(qubits),
+ )?;
// Use uninitialized allocation to avoid memory bandwidth waste.
// TODO: Consider using a memory pool for input buffers to avoid
repeated
// cudaMalloc overhead in high-frequency encode() calls.
- let slice = unsafe {
- _device.alloc::<CuDoubleComplex>(_size_elements)
- }.map_err(|e| map_allocation_error(
- requested_bytes,
- "state vector allocation",
- Some(qubits),
- e,
- ))?;
+ let slice =
+ unsafe { _device.alloc::<CuDoubleComplex>(_size_elements)
}.map_err(|e| {
+ map_allocation_error(
+ requested_bytes,
+ "state vector allocation",
+ Some(qubits),
+ e,
+ )
+ })?;
Ok(Self {
buffer: Arc::new(BufferStorage::F64(GpuBufferRaw { slice })),
@@ -240,7 +261,10 @@ impl GpuStateVector {
#[cfg(not(target_os = "linux"))]
{
// Non-Linux: compiles but GPU unavailable
- Err(MahoutError::Cuda("CUDA is only available on Linux. This build
does not support GPU operations.".to_string()))
+ Err(MahoutError::Cuda(
+ "CUDA is only available on Linux. This build does not support
GPU operations."
+ .to_string(),
+ ))
}
}
@@ -276,30 +300,40 @@ impl GpuStateVector {
/// Allocates num_samples * 2^qubits complex numbers on GPU
pub fn new_batch(_device: &Arc<CudaDevice>, num_samples: usize, qubits:
usize) -> Result<Self> {
let single_state_size: usize = 1usize << qubits;
- let total_elements = num_samples.checked_mul(single_state_size)
- .ok_or_else(|| MahoutError::MemoryAllocation(
- format!("Batch size overflow: {} samples * {} elements",
num_samples, single_state_size)
- ))?;
+ let total_elements =
num_samples.checked_mul(single_state_size).ok_or_else(|| {
+ MahoutError::MemoryAllocation(format!(
+ "Batch size overflow: {} samples * {} elements",
+ num_samples, single_state_size
+ ))
+ })?;
#[cfg(target_os = "linux")]
{
let requested_bytes = total_elements
.checked_mul(std::mem::size_of::<CuDoubleComplex>())
- .ok_or_else(|| MahoutError::MemoryAllocation(
- format!("Requested GPU allocation size overflow
(elements={})", total_elements)
- ))?;
+ .ok_or_else(|| {
+ MahoutError::MemoryAllocation(format!(
+ "Requested GPU allocation size overflow (elements={})",
+ total_elements
+ ))
+ })?;
// Pre-flight check
- ensure_device_memory_available(requested_bytes, "batch state
vector allocation", Some(qubits))?;
-
- let slice = unsafe {
- _device.alloc::<CuDoubleComplex>(total_elements)
- }.map_err(|e| map_allocation_error(
+ ensure_device_memory_available(
requested_bytes,
"batch state vector allocation",
Some(qubits),
- e,
- ))?;
+ )?;
+
+ let slice =
+ unsafe { _device.alloc::<CuDoubleComplex>(total_elements)
}.map_err(|e| {
+ map_allocation_error(
+ requested_bytes,
+ "batch state vector allocation",
+ Some(qubits),
+ e,
+ )
+ })?;
Ok(Self {
buffer: Arc::new(BufferStorage::F64(GpuBufferRaw { slice })),
@@ -312,7 +346,10 @@ impl GpuStateVector {
#[cfg(not(target_os = "linux"))]
{
- Err(MahoutError::Cuda("CUDA is only available on Linux. This build
does not support GPU operations.".to_string()))
+ Err(MahoutError::Cuda(
+ "CUDA is only available on Linux. This build does not support
GPU operations."
+ .to_string(),
+ ))
}
}
@@ -328,26 +365,38 @@ impl GpuStateVector {
(Precision::Float64, Precision::Float32) => {
#[cfg(target_os = "linux")]
{
- let requested_bytes = self.size_elements
+ let requested_bytes = self
+ .size_elements
.checked_mul(std::mem::size_of::<CuComplex>())
- .ok_or_else(|| MahoutError::MemoryAllocation(
- format!("Requested GPU allocation size overflow
(elements={})", self.size_elements)
- ))?;
-
- ensure_device_memory_available(requested_bytes, "state
vector precision conversion", Some(self.num_qubits))?;
-
- let slice = unsafe {
- device.alloc::<CuComplex>(self.size_elements)
- }.map_err(|e| map_allocation_error(
+ .ok_or_else(|| {
+ MahoutError::MemoryAllocation(format!(
+ "Requested GPU allocation size overflow
(elements={})",
+ self.size_elements
+ ))
+ })?;
+
+ ensure_device_memory_available(
requested_bytes,
"state vector precision conversion",
Some(self.num_qubits),
- e,
- ))?;
-
- let src_ptr = self.ptr_f64().ok_or_else(||
MahoutError::InvalidInput(
- "Source state vector is not Float64; cannot convert to
Float32".to_string()
- ))?;
+ )?;
+
+ let slice =
+ unsafe { device.alloc::<CuComplex>(self.size_elements)
}.map_err(|e| {
+ map_allocation_error(
+ requested_bytes,
+ "state vector precision conversion",
+ Some(self.num_qubits),
+ e,
+ )
+ })?;
+
+ let src_ptr = self.ptr_f64().ok_or_else(|| {
+ MahoutError::InvalidInput(
+ "Source state vector is not Float64; cannot
convert to Float32"
+ .to_string(),
+ )
+ })?;
let ret = unsafe {
qdp_kernels::convert_state_to_float(
@@ -359,13 +408,18 @@ impl GpuStateVector {
};
if ret != 0 {
- return Err(MahoutError::KernelLaunch(
- format!("Precision conversion kernel failed: {}",
ret)
- ));
+ return Err(MahoutError::KernelLaunch(format!(
+ "Precision conversion kernel failed: {}",
+ ret
+ )));
}
- device.synchronize()
- .map_err(|e| MahoutError::Cuda(format!("Failed to sync
after precision conversion: {:?}", e)))?;
+ device.synchronize().map_err(|e| {
+ MahoutError::Cuda(format!(
+ "Failed to sync after precision conversion: {:?}",
+ e
+ ))
+ })?;
Ok(Self {
buffer: Arc::new(BufferStorage::F32(GpuBufferRaw {
slice })),
@@ -378,11 +432,13 @@ impl GpuStateVector {
#[cfg(not(target_os = "linux"))]
{
- Err(MahoutError::Cuda("Precision conversion requires CUDA
(Linux)".to_string()))
+ Err(MahoutError::Cuda(
+ "Precision conversion requires CUDA
(Linux)".to_string(),
+ ))
}
}
_ => Err(MahoutError::NotImplemented(
- "Requested precision conversion is not supported".to_string()
+ "Requested precision conversion is not supported".to_string(),
)),
}
}
@@ -406,17 +462,21 @@ impl PinnedBuffer {
unsafe {
let bytes = elements
.checked_mul(std::mem::size_of::<f64>())
- .ok_or_else(|| MahoutError::MemoryAllocation(
- format!("Requested pinned buffer allocation size overflow
(elements={})", elements)
- ))?;
+ .ok_or_else(|| {
+ MahoutError::MemoryAllocation(format!(
+ "Requested pinned buffer allocation size overflow
(elements={})",
+ elements
+ ))
+ })?;
let mut ptr: *mut c_void = std::ptr::null_mut();
let ret = cudaHostAlloc(&mut ptr, bytes, 0); //
cudaHostAllocDefault
if ret != 0 {
- return Err(MahoutError::MemoryAllocation(
- format!("cudaHostAlloc failed with error code: {}", ret)
- ));
+ return Err(MahoutError::MemoryAllocation(format!(
+ "cudaHostAlloc failed with error code: {}",
+ ret
+ )));
}
Ok(Self {
diff --git a/qdp/qdp-core/src/gpu/mod.rs b/qdp/qdp-core/src/gpu/mod.rs
index 77b41f6e8..c42fe1afe 100644
--- a/qdp/qdp-core/src/gpu/mod.rs
+++ b/qdp/qdp-core/src/gpu/mod.rs
@@ -14,15 +14,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-pub mod memory;
pub mod encodings;
+pub mod memory;
pub mod pipeline;
#[cfg(target_os = "linux")]
pub(crate) mod cuda_ffi;
+pub use encodings::{AmplitudeEncoder, AngleEncoder, BasisEncoder,
QuantumEncoder, get_encoder};
pub use memory::GpuStateVector;
-pub use encodings::{QuantumEncoder, AmplitudeEncoder, AngleEncoder,
BasisEncoder, get_encoder};
pub use pipeline::run_dual_stream_pipeline;
#[cfg(target_os = "linux")]
diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs
index bbe74c7b0..7d206d1db 100644
--- a/qdp/qdp-core/src/gpu/pipeline.rs
+++ b/qdp/qdp-core/src/gpu/pipeline.rs
@@ -19,17 +19,17 @@
// Provides generic double-buffered execution for large data processing.
// Separates the "streaming mechanics" from the "kernel logic".
-use std::sync::Arc;
-use std::ffi::c_void;
-use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
use crate::error::{MahoutError, Result};
#[cfg(target_os = "linux")]
-use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error,
PinnedBuffer};
+use crate::gpu::memory::{PinnedBuffer, ensure_device_memory_available,
map_allocation_error};
+use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
+use std::ffi::c_void;
+use std::sync::Arc;
#[cfg(target_os = "linux")]
use crate::gpu::cuda_ffi::{
- cudaEventCreateWithFlags, cudaEventDestroy, cudaEventRecord,
cudaMemcpyAsync, cudaStreamSynchronize,
- cudaStreamWaitEvent, CUDA_EVENT_DISABLE_TIMING, CUDA_MEMCPY_HOST_TO_DEVICE,
+ CUDA_EVENT_DISABLE_TIMING, CUDA_MEMCPY_HOST_TO_DEVICE,
cudaEventCreateWithFlags,
+ cudaEventDestroy, cudaEventRecord, cudaMemcpyAsync, cudaStreamSynchronize,
cudaStreamWaitEvent,
};
/// Dual-stream pipeline context: manages compute/copy streams and sync events
@@ -44,16 +44,21 @@ pub struct PipelineContext {
impl PipelineContext {
/// Create dual streams and sync event
pub fn new(device: &Arc<CudaDevice>) -> Result<Self> {
- let stream_compute = device.fork_default_stream()
+ let stream_compute = device
+ .fork_default_stream()
.map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
- let stream_copy = device.fork_default_stream()
+ let stream_copy = device
+ .fork_default_stream()
.map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
let mut event_copy_done: *mut c_void = std::ptr::null_mut();
unsafe {
let ret = cudaEventCreateWithFlags(&mut event_copy_done,
CUDA_EVENT_DISABLE_TIMING);
if ret != 0 {
- return Err(MahoutError::Cuda(format!("Failed to create CUDA
event: {}", ret)));
+ return Err(MahoutError::Cuda(format!(
+ "Failed to create CUDA event: {}",
+ ret
+ )));
}
}
@@ -65,7 +70,12 @@ impl PipelineContext {
}
/// Async H2D copy on copy stream
- pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut
c_void, len_elements: usize) {
+ pub unsafe fn async_copy_to_device(
+ &self,
+ src: &PinnedBuffer,
+ dst: *mut c_void,
+ len_elements: usize,
+ ) {
crate::profile_scope!("GPU::H2D_Copy");
unsafe {
cudaMemcpyAsync(
@@ -73,7 +83,7 @@ impl PipelineContext {
src.ptr() as *const c_void,
len_elements * std::mem::size_of::<f64>(),
CUDA_MEMCPY_HOST_TO_DEVICE,
- self.stream_copy.stream as *mut c_void
+ self.stream_copy.stream as *mut c_void,
);
}
}
@@ -89,7 +99,11 @@ impl PipelineContext {
pub unsafe fn wait_for_copy(&self) {
crate::profile_scope!("GPU::StreamWait");
unsafe {
- cudaStreamWaitEvent(self.stream_compute.stream as *mut c_void,
self.event_copy_done, 0);
+ cudaStreamWaitEvent(
+ self.stream_compute.stream as *mut c_void,
+ self.event_copy_done,
+ 0,
+ );
}
}
@@ -158,9 +172,11 @@ where
crate::profile_scope!("GPU::AsyncPipeline");
// 1. Create dual streams for pipeline overlap
- let stream1 = device.fork_default_stream()
+ let stream1 = device
+ .fork_default_stream()
.map_err(|e| MahoutError::Cuda(format!("Failed to create stream 1:
{:?}", e)))?;
- let stream2 = device.fork_default_stream()
+ let stream2 = device
+ .fork_default_stream()
.map_err(|e| MahoutError::Cuda(format!("Failed to create stream 2:
{:?}", e)))?;
let streams = [&stream1, &stream2];
@@ -181,18 +197,13 @@ where
crate::profile_scope!("GPU::ChunkProcess");
- let chunk_bytes = chunk.len() * std::mem::size_of::<f64>();
+ let chunk_bytes = std::mem::size_of_val(chunk);
ensure_device_memory_available(chunk_bytes, "pipeline chunk buffer
allocation", None)?;
// Allocate temporary device buffer for this chunk
- let input_chunk_dev = unsafe {
- device.alloc::<f64>(chunk.len())
- }.map_err(|e| map_allocation_error(
- chunk_bytes,
- "pipeline chunk buffer allocation",
- None,
- e,
- ))?;
+ let input_chunk_dev = unsafe { device.alloc::<f64>(chunk.len())
}.map_err(|e| {
+ map_allocation_error(chunk_bytes, "pipeline chunk buffer
allocation", None, e)
+ })?;
// Async copy: host to device (non-blocking, on specified stream)
// Uses CUDA Runtime API (cudaMemcpyAsync) for true async copy
@@ -201,7 +212,7 @@ where
unsafe {
let dst_device_ptr = *input_chunk_dev.device_ptr() as *mut
c_void;
let src_host_ptr = chunk.as_ptr() as *const c_void;
- let bytes = chunk.len() * std::mem::size_of::<f64>();
+ let bytes = std::mem::size_of_val(chunk);
let stream_handle = current_stream.stream as *mut c_void;
let result = cudaMemcpyAsync(
@@ -213,9 +224,10 @@ where
);
if result != 0 {
- return Err(MahoutError::Cuda(
- format!("Async H2D copy failed with CUDA error: {}",
result)
- ));
+ return Err(MahoutError::Cuda(format!(
+ "Async H2D copy failed with CUDA error: {}",
+ result
+ )));
}
}
}
@@ -242,9 +254,11 @@ where
// This ensures all async copies and kernel launches have finished
{
crate::profile_scope!("GPU::StreamSync");
- device.wait_for(&stream1)
+ device
+ .wait_for(&stream1)
.map_err(|e| MahoutError::Cuda(format!("Stream 1 sync failed:
{:?}", e)))?;
- device.wait_for(&stream2)
+ device
+ .wait_for(&stream2)
.map_err(|e| MahoutError::Cuda(format!("Stream 2 sync failed:
{:?}", e)))?;
}
diff --git a/qdp/qdp-core/src/io.rs b/qdp/qdp-core/src/io.rs
index e7b253626..762d7127d 100644
--- a/qdp/qdp-core/src/io.rs
+++ b/qdp/qdp-core/src/io.rs
@@ -26,11 +26,11 @@ use std::fs::File;
use std::path::Path;
use std::sync::Arc;
-use arrow::array::{Array, ArrayRef, Float64Array, FixedSizeListArray,
ListArray, RecordBatch};
+use arrow::array::{Array, ArrayRef, FixedSizeListArray, Float64Array,
ListArray, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::ipc::reader::FileReader as ArrowFileReader;
-use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ArrowWriter;
+use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::file::properties::WriterProperties;
use crate::error::{MahoutError, Result};
@@ -96,26 +96,23 @@ pub fn write_parquet<P: AsRef<Path>>(
let array = Float64Array::from_iter_values(data.iter().copied());
let array_ref: ArrayRef = Arc::new(array);
- let batch = RecordBatch::try_new(schema.clone(),
vec![array_ref]).map_err(|e| {
- MahoutError::Io(format!("Failed to create RecordBatch: {}", e))
- })?;
+ let batch = RecordBatch::try_new(schema.clone(), vec![array_ref])
+ .map_err(|e| MahoutError::Io(format!("Failed to create RecordBatch:
{}", e)))?;
- let file = File::create(path.as_ref()).map_err(|e| {
- MahoutError::Io(format!("Failed to create Parquet file: {}", e))
- })?;
+ let file = File::create(path.as_ref())
+ .map_err(|e| MahoutError::Io(format!("Failed to create Parquet file:
{}", e)))?;
let props = WriterProperties::builder().build();
- let mut writer = ArrowWriter::try_new(file, schema,
Some(props)).map_err(|e| {
- MahoutError::Io(format!("Failed to create Parquet writer: {}", e))
- })?;
+ let mut writer = ArrowWriter::try_new(file, schema, Some(props))
+ .map_err(|e| MahoutError::Io(format!("Failed to create Parquet writer:
{}", e)))?;
- writer.write(&batch).map_err(|e| {
- MahoutError::Io(format!("Failed to write Parquet batch: {}", e))
- })?;
+ writer
+ .write(&batch)
+ .map_err(|e| MahoutError::Io(format!("Failed to write Parquet batch:
{}", e)))?;
- writer.close().map_err(|e| {
- MahoutError::Io(format!("Failed to close Parquet writer: {}", e))
- })?;
+ writer
+ .close()
+ .map_err(|e| MahoutError::Io(format!("Failed to close Parquet writer:
{}", e)))?;
Ok(())
}
@@ -124,29 +121,24 @@ pub fn write_parquet<P: AsRef<Path>>(
///
/// Returns one array per row group for zero-copy access.
pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) ->
Result<Vec<Float64Array>> {
- let file = File::open(path.as_ref()).map_err(|e| {
- MahoutError::Io(format!("Failed to open Parquet file: {}", e))
- })?;
+ let file = File::open(path.as_ref())
+ .map_err(|e| MahoutError::Io(format!("Failed to open Parquet file:
{}", e)))?;
- let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
- MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
- })?;
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file)
+ .map_err(|e| MahoutError::Io(format!("Failed to create Parquet reader:
{}", e)))?;
- let mut reader = builder.build().map_err(|e| {
- MahoutError::Io(format!("Failed to build Parquet reader: {}", e))
- })?;
+ let reader = builder
+ .build()
+ .map_err(|e| MahoutError::Io(format!("Failed to build Parquet reader:
{}", e)))?;
let mut arrays = Vec::new();
- while let Some(batch_result) = reader.next() {
- let batch = batch_result.map_err(|e| {
- MahoutError::Io(format!("Failed to read Parquet batch: {}", e))
- })?;
+ for batch_result in reader {
+ let batch = batch_result
+ .map_err(|e| MahoutError::Io(format!("Failed to read Parquet
batch: {}", e)))?;
if batch.num_columns() == 0 {
- return Err(MahoutError::Io(
- "Parquet file has no columns".to_string(),
- ));
+ return Err(MahoutError::Io("Parquet file has no
columns".to_string()));
}
let column = batch.column(0);
@@ -160,18 +152,14 @@ pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) ->
Result<Vec<Float64Array
let float_array = column
.as_any()
.downcast_ref::<Float64Array>()
- .ok_or_else(|| {
- MahoutError::Io("Failed to downcast to
Float64Array".to_string())
- })?
+ .ok_or_else(|| MahoutError::Io("Failed to downcast to
Float64Array".to_string()))?
.clone();
arrays.push(float_array);
}
if arrays.is_empty() {
- return Err(MahoutError::Io(
- "Parquet file contains no data".to_string(),
- ));
+ return Err(MahoutError::Io("Parquet file contains no
data".to_string()));
}
Ok(arrays)
@@ -203,26 +191,23 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>(
)]));
let array_ref: ArrayRef = Arc::new(array.clone());
- let batch = RecordBatch::try_new(schema.clone(),
vec![array_ref]).map_err(|e| {
- MahoutError::Io(format!("Failed to create RecordBatch: {}", e))
- })?;
+ let batch = RecordBatch::try_new(schema.clone(), vec![array_ref])
+ .map_err(|e| MahoutError::Io(format!("Failed to create RecordBatch:
{}", e)))?;
- let file = File::create(path.as_ref()).map_err(|e| {
- MahoutError::Io(format!("Failed to create Parquet file: {}", e))
- })?;
+ let file = File::create(path.as_ref())
+ .map_err(|e| MahoutError::Io(format!("Failed to create Parquet file:
{}", e)))?;
let props = WriterProperties::builder().build();
- let mut writer = ArrowWriter::try_new(file, schema,
Some(props)).map_err(|e| {
- MahoutError::Io(format!("Failed to create Parquet writer: {}", e))
- })?;
+ let mut writer = ArrowWriter::try_new(file, schema, Some(props))
+ .map_err(|e| MahoutError::Io(format!("Failed to create Parquet writer:
{}", e)))?;
- writer.write(&batch).map_err(|e| {
- MahoutError::Io(format!("Failed to write Parquet batch: {}", e))
- })?;
+ writer
+ .write(&batch)
+ .map_err(|e| MahoutError::Io(format!("Failed to write Parquet batch:
{}", e)))?;
- writer.close().map_err(|e| {
- MahoutError::Io(format!("Failed to close Parquet writer: {}", e))
- })?;
+ writer
+ .close()
+ .map_err(|e| MahoutError::Io(format!("Failed to close Parquet writer:
{}", e)))?;
Ok(())
}
@@ -237,28 +222,25 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>(
/// # TODO
/// Add OOM protection for very large files
pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize,
usize)> {
- let file = File::open(path.as_ref()).map_err(|e| {
- MahoutError::Io(format!("Failed to open Parquet file: {}", e))
- })?;
+ let file = File::open(path.as_ref())
+ .map_err(|e| MahoutError::Io(format!("Failed to open Parquet file:
{}", e)))?;
- let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
- MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
- })?;
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file)
+ .map_err(|e| MahoutError::Io(format!("Failed to create Parquet reader:
{}", e)))?;
let total_rows = builder.metadata().file_metadata().num_rows() as usize;
- let mut reader = builder.build().map_err(|e| {
- MahoutError::Io(format!("Failed to build Parquet reader: {}", e))
- })?;
+ let reader = builder
+ .build()
+ .map_err(|e| MahoutError::Io(format!("Failed to build Parquet reader:
{}", e)))?;
let mut all_data = Vec::new();
let mut num_samples = 0;
let mut sample_size = None;
- while let Some(batch_result) = reader.next() {
- let batch = batch_result.map_err(|e| {
- MahoutError::Io(format!("Failed to read Parquet batch: {}", e))
- })?;
+ for batch_result in reader {
+ let batch = batch_result
+ .map_err(|e| MahoutError::Io(format!("Failed to read Parquet
batch: {}", e)))?;
if batch.num_columns() == 0 {
return Err(MahoutError::Io("Parquet file has no
columns".to_string()));
@@ -309,9 +291,8 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize, u
}
}
- let sample_size = sample_size.ok_or_else(|| {
- MahoutError::Io("Parquet file contains no data".to_string())
- })?;
+ let sample_size =
+ sample_size.ok_or_else(|| MahoutError::Io("Parquet file contains no
data".to_string()))?;
Ok((all_data, num_samples, sample_size))
}
@@ -327,22 +308,19 @@ pub fn read_parquet_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize, u
/// # TODO
/// Add OOM protection for very large files
pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>,
usize, usize)> {
- let file = File::open(path.as_ref()).map_err(|e| {
- MahoutError::Io(format!("Failed to open Arrow IPC file: {}", e))
- })?;
+ let file = File::open(path.as_ref())
+ .map_err(|e| MahoutError::Io(format!("Failed to open Arrow IPC file:
{}", e)))?;
- let reader = ArrowFileReader::try_new(file, None).map_err(|e| {
- MahoutError::Io(format!("Failed to create Arrow IPC reader: {}", e))
- })?;
+ let reader = ArrowFileReader::try_new(file, None)
+ .map_err(|e| MahoutError::Io(format!("Failed to create Arrow IPC
reader: {}", e)))?;
let mut all_data = Vec::new();
let mut num_samples = 0;
let mut sample_size: Option<usize> = None;
for batch_result in reader {
- let batch = batch_result.map_err(|e| {
- MahoutError::Io(format!("Failed to read Arrow batch: {}", e))
- })?;
+ let batch = batch_result
+ .map_err(|e| MahoutError::Io(format!("Failed to read Arrow batch:
{}", e)))?;
if batch.num_columns() == 0 {
return Err(MahoutError::Io("Arrow file has no
columns".to_string()));
@@ -355,7 +333,9 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
let list_array = column
.as_any()
.downcast_ref::<FixedSizeListArray>()
- .ok_or_else(|| MahoutError::Io("Failed to downcast to
FixedSizeListArray".to_string()))?;
+ .ok_or_else(|| {
+ MahoutError::Io("Failed to downcast to
FixedSizeListArray".to_string())
+ })?;
let current_size = *size as usize;
@@ -387,17 +367,18 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
}
DataType::List(_) => {
- let list_array = column
- .as_any()
- .downcast_ref::<ListArray>()
- .ok_or_else(|| MahoutError::Io("Failed to downcast to
ListArray".to_string()))?;
+ let list_array =
column.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
+ MahoutError::Io("Failed to downcast to
ListArray".to_string())
+ })?;
for i in 0..list_array.len() {
let value_array = list_array.value(i);
let float_array = value_array
.as_any()
.downcast_ref::<Float64Array>()
- .ok_or_else(|| MahoutError::Io("List values must be
Float64".to_string()))?;
+ .ok_or_else(|| {
+ MahoutError::Io("List values must be
Float64".to_string())
+ })?;
let current_size = float_array.len();
@@ -432,9 +413,8 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
}
}
- let sample_size = sample_size.ok_or_else(|| {
- MahoutError::Io("Arrow file contains no data".to_string())
- })?;
+ let sample_size =
+ sample_size.ok_or_else(|| MahoutError::Io("Arrow file contains no
data".to_string()))?;
Ok((all_data, num_samples, sample_size))
}
@@ -458,13 +438,11 @@ impl ParquetBlockReader {
/// * `path` - Path to the Parquet file
/// * `batch_size` - Optional batch size (defaults to 2048)
pub fn new<P: AsRef<Path>>(path: P, batch_size: Option<usize>) ->
Result<Self> {
- let file = File::open(path.as_ref()).map_err(|e| {
- MahoutError::Io(format!("Failed to open Parquet file: {}", e))
- })?;
+ let file = File::open(path.as_ref())
+ .map_err(|e| MahoutError::Io(format!("Failed to open Parquet file:
{}", e)))?;
- let builder =
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
- MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
- })?;
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file)
+ .map_err(|e| MahoutError::Io(format!("Failed to create Parquet
reader: {}", e)))?;
let schema = builder.schema();
if schema.fields().len() != 1 {
@@ -506,9 +484,7 @@ impl ParquetBlockReader {
let reader = builder
.with_batch_size(batch_size)
.build()
- .map_err(|e| {
- MahoutError::Io(format!("Failed to build Parquet reader: {}",
e))
- })?;
+ .map_err(|e| MahoutError::Io(format!("Failed to build Parquet
reader: {}", e)))?;
Ok(Self {
reader,
@@ -547,8 +523,8 @@ impl ParquetBlockReader {
let to_copy = std::cmp::min(available, space_left);
if to_copy > 0 {
- buffer[written..written+to_copy].copy_from_slice(
-
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+ buffer[written..written + to_copy].copy_from_slice(
+
&self.leftover_data[self.leftover_cursor..self.leftover_cursor + to_copy],
);
written += to_copy;
self.leftover_cursor += to_copy;
@@ -574,10 +550,10 @@ impl ParquetBlockReader {
let (current_sample_size, batch_values) = match
column.data_type() {
DataType::List(_) => {
- let list_array = column
- .as_any()
- .downcast_ref::<ListArray>()
- .ok_or_else(|| MahoutError::Io("Failed to
downcast to ListArray".to_string()))?;
+ let list_array =
+
column.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
+ MahoutError::Io("Failed to downcast to
ListArray".to_string())
+ })?;
if list_array.len() == 0 {
continue;
@@ -590,7 +566,9 @@ impl ParquetBlockReader {
let float_array = value_array
.as_any()
.downcast_ref::<Float64Array>()
- .ok_or_else(|| MahoutError::Io("List
values must be Float64".to_string()))?;
+ .ok_or_else(|| {
+ MahoutError::Io("List values must be
Float64".to_string())
+ })?;
if i == 0 {
current_sample_size =
Some(float_array.len());
@@ -603,13 +581,21 @@ impl ParquetBlockReader {
}
}
- (current_sample_size.expect("list_array.len() > 0
ensures at least one element"), batch_values)
+ (
+ current_sample_size
+ .expect("list_array.len() > 0 ensures at
least one element"),
+ batch_values,
+ )
}
DataType::FixedSizeList(_, size) => {
let list_array = column
.as_any()
.downcast_ref::<FixedSizeListArray>()
- .ok_or_else(|| MahoutError::Io("Failed to
downcast to FixedSizeListArray".to_string()))?;
+ .ok_or_else(|| {
+ MahoutError::Io(
+ "Failed to downcast to
FixedSizeListArray".to_string(),
+ )
+ })?;
if list_array.len() == 0 {
continue;
@@ -621,7 +607,11 @@ impl ParquetBlockReader {
let float_array = values
.as_any()
.downcast_ref::<Float64Array>()
- .ok_or_else(|| MahoutError::Io("FixedSizeList
values must be Float64".to_string()))?;
+ .ok_or_else(|| {
+ MahoutError::Io(
+ "FixedSizeList values must be
Float64".to_string(),
+ )
+ })?;
let mut batch_values = Vec::new();
if float_array.null_count() == 0 {
@@ -643,34 +633,34 @@ impl ParquetBlockReader {
if self.sample_size.is_none() {
self.sample_size = Some(current_sample_size);
limit = calc_limit(current_sample_size);
- } else {
- if let Some(expected_size) = self.sample_size {
- if current_sample_size != expected_size {
- return Err(MahoutError::InvalidInput(format!(
- "Inconsistent sample sizes: expected {},
got {}",
- expected_size, current_sample_size
- )));
- }
- }
+ } else if let Some(expected_size) = self.sample_size
+ && current_sample_size != expected_size
+ {
+ return Err(MahoutError::InvalidInput(format!(
+ "Inconsistent sample sizes: expected {}, got {}",
+ expected_size, current_sample_size
+ )));
}
let available = batch_values.len();
let space_left = limit - written;
if available <= space_left {
-
buffer[written..written+available].copy_from_slice(&batch_values);
+ buffer[written..written +
available].copy_from_slice(&batch_values);
written += available;
} else {
if space_left > 0 {
-
buffer[written..written+space_left].copy_from_slice(&batch_values[0..space_left]);
+ buffer[written..written + space_left]
+ .copy_from_slice(&batch_values[0..space_left]);
written += space_left;
}
self.leftover_data.clear();
-
self.leftover_data.extend_from_slice(&batch_values[space_left..]);
+ self.leftover_data
+ .extend_from_slice(&batch_values[space_left..]);
self.leftover_cursor = 0;
break;
}
- },
+ }
Some(Err(e)) => return Err(MahoutError::Io(format!("Parquet
read error: {}", e))),
None => break,
}
diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs
index 1a1d1b320..a70748868 100644
--- a/qdp/qdp-core/src/lib.rs
+++ b/qdp/qdp-core/src/lib.rs
@@ -15,33 +15,33 @@
// limitations under the License.
pub mod dlpack;
-pub mod gpu;
pub mod error;
-pub mod preprocessing;
+pub mod gpu;
pub mod io;
+pub mod preprocessing;
#[macro_use]
mod profiling;
pub use error::{MahoutError, Result};
pub use gpu::memory::Precision;
-use std::sync::Arc;
#[cfg(target_os = "linux")]
use std::ffi::c_void;
+use std::sync::Arc;
#[cfg(target_os = "linux")]
-use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
+use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
#[cfg(target_os = "linux")]
use std::thread;
-use cudarc::driver::{CudaDevice, DevicePtr, DevicePtrMut};
use crate::dlpack::DLManagedTensor;
-use crate::gpu::get_encoder;
-#[cfg(target_os = "linux")]
-use crate::gpu::memory::{PinnedBuffer, GpuStateVector};
#[cfg(target_os = "linux")]
use crate::gpu::PipelineContext;
+use crate::gpu::get_encoder;
#[cfg(target_os = "linux")]
-use qdp_kernels::{launch_l2_norm_batch, launch_amplitude_encode_batch};
+use crate::gpu::memory::{GpuStateVector, PinnedBuffer};
+use cudarc::driver::{CudaDevice, DevicePtr, DevicePtrMut};
+#[cfg(target_os = "linux")]
+use qdp_kernels::{launch_amplitude_encode_batch, launch_l2_norm_batch};
/// 512MB staging buffer for large Parquet row groups (reduces fragmentation)
#[cfg(target_os = "linux")]
@@ -69,10 +69,14 @@ impl QdpEngine {
/// Initialize engine with explicit precision.
pub fn new_with_precision(device_id: usize, precision: Precision) ->
Result<Self> {
- let device = CudaDevice::new(device_id)
- .map_err(|e| MahoutError::Cuda(format!("Failed to initialize CUDA
device {}: {:?}", device_id, e)))?;
+ let device = CudaDevice::new(device_id).map_err(|e| {
+ MahoutError::Cuda(format!(
+ "Failed to initialize CUDA device {}: {:?}",
+ device_id, e
+ ))
+ })?;
Ok(Self {
- device, // CudaDevice::new already returns Arc<CudaDevice> in
cudarc 0.11
+ device, // CudaDevice::new already returns Arc<CudaDevice> in
cudarc 0.11
precision,
})
}
@@ -175,13 +179,16 @@ impl QdpEngine {
#[cfg(target_os = "linux")]
{
if encoding_method != "amplitude" {
- return Err(MahoutError::NotImplemented("Only amplitude
encoding supported for streaming".into()));
+ return Err(MahoutError::NotImplemented(
+ "Only amplitude encoding supported for streaming".into(),
+ ));
}
let mut reader_core = crate::io::ParquetBlockReader::new(path,
None)?;
let num_samples = reader_core.total_rows;
- let total_state_vector = GpuStateVector::new_batch(&self.device,
num_samples, num_qubits)?;
+ let total_state_vector =
+ GpuStateVector::new_batch(&self.device, num_samples,
num_qubits)?;
let ctx = PipelineContext::new(&self.device)?;
let dev_in_a = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
@@ -189,17 +196,24 @@ impl QdpEngine {
let dev_in_b = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
.map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
- let (full_buf_tx, full_buf_rx):
(SyncSender<std::result::Result<(PinnedBuffer, usize), MahoutError>>,
Receiver<std::result::Result<(PinnedBuffer, usize), MahoutError>>) =
sync_channel(2);
- let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>,
Receiver<PinnedBuffer>) = sync_channel(2);
+ let (full_buf_tx, full_buf_rx): (
+ SyncSender<std::result::Result<(PinnedBuffer, usize),
MahoutError>>,
+ Receiver<std::result::Result<(PinnedBuffer, usize),
MahoutError>>,
+ ) = sync_channel(2);
+ let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>,
Receiver<PinnedBuffer>) =
+ sync_channel(2);
let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
let first_len =
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
- let sample_size = reader_core.get_sample_size()
- .ok_or_else(|| MahoutError::InvalidInput("Could not determine
sample size".into()))?;
+ let sample_size = reader_core.get_sample_size().ok_or_else(|| {
+ MahoutError::InvalidInput("Could not determine sample
size".into())
+ })?;
if sample_size == 0 {
- return Err(MahoutError::InvalidInput("Sample size cannot be
zero".into()));
+ return Err(MahoutError::InvalidInput(
+ "Sample size cannot be zero".into(),
+ ));
}
if sample_size > STAGE_SIZE_ELEMENTS {
return Err(MahoutError::InvalidInput(format!(
@@ -217,16 +231,22 @@ impl QdpEngine {
1,
std::cmp::min(num_samples, STAGE_SIZE_ELEMENTS / sample_size),
);
- let mut norm_buffer =
self.device.alloc_zeros::<f64>(max_samples_per_chunk)
- .map_err(|e| MahoutError::MemoryAllocation(format!(
- "Failed to allocate norm buffer: {:?}",
- e
- )))?;
-
- full_buf_tx.send(Ok((host_buf_first, first_len)))
+ let mut norm_buffer = self
+ .device
+ .alloc_zeros::<f64>(max_samples_per_chunk)
+ .map_err(|e| {
+ MahoutError::MemoryAllocation(format!(
+ "Failed to allocate norm buffer: {:?}",
+ e
+ ))
+ })?;
+
+ full_buf_tx
+ .send(Ok((host_buf_first, first_len)))
.map_err(|_| MahoutError::Io("Failed to send first
buffer".into()))?;
- empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+ empty_buf_tx
+ .send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
.map_err(|_| MahoutError::Io("Failed to send second
buffer".into()))?;
let mut reader = reader_core;
@@ -237,16 +257,22 @@ impl QdpEngine {
Err(_) => break,
};
- let result =
reader.read_chunk(buffer.as_slice_mut()).map(|len| (buffer, len));
+ let result = reader
+ .read_chunk(buffer.as_slice_mut())
+ .map(|len| (buffer, len));
let should_break = match &result {
Ok((_, len)) => *len == 0,
Err(_) => true,
};
- if full_buf_tx.send(result).is_err() { break; }
+ if full_buf_tx.send(result).is_err() {
+ break;
+ }
- if should_break { break; }
+ if should_break {
+ break;
+ }
}
});
@@ -261,7 +287,9 @@ impl QdpEngine {
Err(_) => return Err(MahoutError::Io("IO thread
disconnected".into())),
};
- if current_len == 0 { break; }
+ if current_len == 0 {
+ break;
+ }
if current_len % sample_size != 0 {
return Err(MahoutError::InvalidInput(format!(
@@ -272,7 +300,11 @@ impl QdpEngine {
let samples_in_chunk = current_len / sample_size;
if samples_in_chunk > 0 {
- let dev_ptr = if use_dev_a { *dev_in_a.device_ptr() } else
{ *dev_in_b.device_ptr() };
+ let dev_ptr = if use_dev_a {
+ *dev_in_a.device_ptr()
+ } else {
+ *dev_in_b.device_ptr()
+ };
unsafe {
crate::profile_scope!("GPU::Dispatch");
@@ -285,17 +317,26 @@ impl QdpEngine {
crate::profile_scope!("GPU::BatchEncode");
let offset_elements = global_sample_offset
.checked_mul(state_len_per_sample)
- .ok_or_else(|| MahoutError::MemoryAllocation(
- format!("Offset calculation overflow: {} *
{}", global_sample_offset, state_len_per_sample)
- ))?;
+ .ok_or_else(|| {
+ MahoutError::MemoryAllocation(format!(
+ "Offset calculation overflow: {} * {}",
+ global_sample_offset,
state_len_per_sample
+ ))
+ })?;
let offset_bytes = offset_elements
.checked_mul(std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
- .ok_or_else(|| MahoutError::MemoryAllocation(
- format!("Offset bytes calculation
overflow: {} * {}", offset_elements,
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
- ))?;
-
- let state_ptr_offset =
total_state_vector.ptr_void().cast::<u8>()
+ .ok_or_else(|| {
+ MahoutError::MemoryAllocation(format!(
+ "Offset bytes calculation overflow: {}
* {}",
+ offset_elements,
+
std::mem::size_of::<qdp_kernels::CuDoubleComplex>()
+ ))
+ })?;
+
+ let state_ptr_offset = total_state_vector
+ .ptr_void()
+ .cast::<u8>()
.add(offset_bytes)
.cast::<std::ffi::c_void>();
debug_assert!(
@@ -310,10 +351,13 @@ impl QdpEngine {
samples_in_chunk,
sample_size,
*norm_buffer.device_ptr_mut() as *mut f64,
- ctx.stream_compute.stream as *mut c_void
+ ctx.stream_compute.stream as *mut c_void,
);
if ret != 0 {
- return
Err(MahoutError::KernelLaunch(format!("Norm kernel error: {}", ret)));
+ return
Err(MahoutError::KernelLaunch(format!(
+ "Norm kernel error: {}",
+ ret
+ )));
}
}
@@ -326,10 +370,13 @@ impl QdpEngine {
samples_in_chunk,
sample_size,
state_len_per_sample,
- ctx.stream_compute.stream as *mut c_void
+ ctx.stream_compute.stream as *mut c_void,
);
if ret != 0 {
- return
Err(MahoutError::KernelLaunch(format!("Encode kernel error: {}", ret)));
+ return
Err(MahoutError::KernelLaunch(format!(
+ "Encode kernel error: {}",
+ ret
+ )));
}
}
}
@@ -338,17 +385,24 @@ impl QdpEngine {
}
global_sample_offset = global_sample_offset
.checked_add(samples_in_chunk)
- .ok_or_else(|| MahoutError::MemoryAllocation(
- format!("Sample offset overflow: {} + {}",
global_sample_offset, samples_in_chunk)
- ))?;
+ .ok_or_else(|| {
+ MahoutError::MemoryAllocation(format!(
+ "Sample offset overflow: {} + {}",
+ global_sample_offset, samples_in_chunk
+ ))
+ })?;
use_dev_a = !use_dev_a;
}
let _ = empty_buf_tx.send(host_buffer);
}
- self.device.synchronize().map_err(|e|
MahoutError::Cuda(format!("{:?}", e)))?;
- io_handle.join().map_err(|e| MahoutError::Io(format!("IO thread
panicked: {:?}", e)))?;
+ self.device
+ .synchronize()
+ .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+ io_handle
+ .join()
+ .map_err(|e| MahoutError::Io(format!("IO thread panicked:
{:?}", e)))?;
let dlpack_ptr = total_state_vector.to_dlpack();
Ok(dlpack_ptr)
@@ -357,7 +411,13 @@ impl QdpEngine {
#[cfg(not(target_os = "linux"))]
{
let (batch_data, num_samples, sample_size) =
crate::io::read_parquet_batch(path)?;
- self.encode_batch(&batch_data, num_samples, sample_size,
num_qubits, encoding_method)
+ self.encode_batch(
+ &batch_data,
+ num_samples,
+ sample_size,
+ num_qubits,
+ encoding_method,
+ )
}
}
@@ -387,7 +447,13 @@ impl QdpEngine {
crate::io::read_arrow_ipc_batch(path)?
};
- self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
+ self.encode_batch(
+ &batch_data,
+ num_samples,
+ sample_size,
+ num_qubits,
+ encoding_method,
+ )
}
}
diff --git a/qdp/qdp-core/src/preprocessing.rs
b/qdp/qdp-core/src/preprocessing.rs
index 43577a8eb..c790febf2 100644
--- a/qdp/qdp-core/src/preprocessing.rs
+++ b/qdp/qdp-core/src/preprocessing.rs
@@ -14,8 +14,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use rayon::prelude::*;
use crate::error::{MahoutError, Result};
+use rayon::prelude::*;
/// Shared CPU-based pre-processing pipeline for quantum encoding.
///
@@ -34,27 +34,30 @@ impl Preprocessor {
// Validate qubits (max 30 = 16GB GPU memory)
if num_qubits == 0 {
return Err(MahoutError::InvalidInput(
- "Number of qubits must be at least 1".to_string()
+ "Number of qubits must be at least 1".to_string(),
));
}
if num_qubits > 30 {
- return Err(MahoutError::InvalidInput(
- format!("Number of qubits {} exceeds practical limit of 30",
num_qubits)
- ));
+ return Err(MahoutError::InvalidInput(format!(
+ "Number of qubits {} exceeds practical limit of 30",
+ num_qubits
+ )));
}
// Validate input data
if host_data.is_empty() {
return Err(MahoutError::InvalidInput(
- "Input data cannot be empty".to_string()
+ "Input data cannot be empty".to_string(),
));
}
let state_len = 1 << num_qubits;
if host_data.len() > state_len {
- return Err(MahoutError::InvalidInput(
- format!("Input data length {} exceeds state vector size {}",
host_data.len(), state_len)
- ));
+ return Err(MahoutError::InvalidInput(format!(
+ "Input data length {} exceeds state vector size {}",
+ host_data.len(),
+ state_len
+ )));
}
Ok(())
@@ -71,7 +74,9 @@ impl Preprocessor {
};
if norm == 0.0 {
- return Err(MahoutError::InvalidInput("Input data has zero
norm".to_string()));
+ return Err(MahoutError::InvalidInput(
+ "Input data has zero norm".to_string(),
+ ));
}
Ok(norm)
@@ -86,28 +91,32 @@ impl Preprocessor {
) -> Result<()> {
if num_samples == 0 {
return Err(MahoutError::InvalidInput(
- "num_samples must be greater than 0".to_string()
+ "num_samples must be greater than 0".to_string(),
));
}
if batch_data.len() != num_samples * sample_size {
- return Err(MahoutError::InvalidInput(
- format!("Batch data length {} doesn't match num_samples {} *
sample_size {}",
- batch_data.len(), num_samples, sample_size)
- ));
+ return Err(MahoutError::InvalidInput(format!(
+ "Batch data length {} doesn't match num_samples {} *
sample_size {}",
+ batch_data.len(),
+ num_samples,
+ sample_size
+ )));
}
if num_qubits == 0 || num_qubits > 30 {
- return Err(MahoutError::InvalidInput(
- format!("Number of qubits {} must be between 1 and 30",
num_qubits)
- ));
+ return Err(MahoutError::InvalidInput(format!(
+ "Number of qubits {} must be between 1 and 30",
+ num_qubits
+ )));
}
let state_len = 1 << num_qubits;
if sample_size > state_len {
- return Err(MahoutError::InvalidInput(
- format!("Sample size {} exceeds state vector size {}",
sample_size, state_len)
- ));
+ return Err(MahoutError::InvalidInput(format!(
+ "Sample size {} exceeds state vector size {}",
+ sample_size, state_len
+ )));
}
Ok(())
@@ -129,9 +138,10 @@ impl Preprocessor {
let norm_sq: f64 = sample.iter().map(|&x| x * x).sum();
let norm = norm_sq.sqrt();
if norm == 0.0 {
- return Err(MahoutError::InvalidInput(
- format!("Sample {} has zero norm", i)
- ));
+ return Err(MahoutError::InvalidInput(format!(
+ "Sample {} has zero norm",
+ i
+ )));
}
Ok(norm)
})
diff --git a/qdp/qdp-core/src/profiling.rs b/qdp/qdp-core/src/profiling.rs
index 0b60e6c82..832bfdc40 100644
--- a/qdp/qdp-core/src/profiling.rs
+++ b/qdp/qdp-core/src/profiling.rs
@@ -14,7 +14,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-
// Zero-cost profiling macros for NVTX integration
//
// Provides clean abstraction over NVTX markers without cluttering business
logic.
diff --git a/qdp/qdp-core/tests/api_workflow.rs
b/qdp/qdp-core/tests/api_workflow.rs
index 0973d27d5..ff7f0d77c 100644
--- a/qdp/qdp-core/tests/api_workflow.rs
+++ b/qdp/qdp-core/tests/api_workflow.rs
@@ -30,7 +30,10 @@ fn test_engine_initialization() {
match engine {
Ok(_) => println!("PASS: Engine initialized successfully"),
Err(e) => {
- println!("SKIP: CUDA initialization failed (no GPU available):
{:?}", e);
+ println!(
+ "SKIP: CUDA initialization failed (no GPU available): {:?}",
+ e
+ );
return;
}
}
@@ -65,7 +68,10 @@ fn test_amplitude_encoding_workflow() {
assert!(managed.deleter.is_some(), "Deleter must be present");
println!("Calling deleter to free GPU memory");
- let deleter = managed.deleter.take().expect("Deleter function pointer
is missing!");
+ let deleter = managed
+ .deleter
+ .take()
+ .expect("Deleter function pointer is missing!");
deleter(dlpack_ptr);
println!("PASS: Memory freed successfully");
}
@@ -98,7 +104,10 @@ fn test_amplitude_encoding_async_pipeline() {
assert!(managed.deleter.is_some(), "Deleter must be present");
println!("Calling deleter to free GPU memory");
- let deleter = managed.deleter.take().expect("Deleter function pointer
is missing!");
+ let deleter = managed
+ .deleter
+ .take()
+ .expect("Deleter function pointer is missing!");
deleter(dlpack_ptr);
println!("PASS: Memory freed successfully");
}
@@ -125,7 +134,13 @@ fn test_batch_dlpack_2d_shape() {
.map(|i| (i as f64) / 10.0)
.collect();
- let result = engine.encode_batch(&batch_data, num_samples, sample_size,
num_qubits, "amplitude");
+ let result = engine.encode_batch(
+ &batch_data,
+ num_samples,
+ sample_size,
+ num_qubits,
+ "amplitude",
+ );
let dlpack_ptr = result.expect("Batch encoding should succeed");
assert!(!dlpack_ptr.is_null(), "DLPack pointer should not be null");
@@ -137,16 +152,35 @@ fn test_batch_dlpack_2d_shape() {
assert_eq!(tensor.ndim, 2, "Batch tensor should be 2D");
let shape_slice = std::slice::from_raw_parts(tensor.shape, tensor.ndim
as usize);
- assert_eq!(shape_slice[0], num_samples as i64, "First dimension should
be num_samples");
- assert_eq!(shape_slice[1], (1 << num_qubits) as i64, "Second dimension
should be 2^num_qubits");
+ assert_eq!(
+ shape_slice[0], num_samples as i64,
+ "First dimension should be num_samples"
+ );
+ assert_eq!(
+ shape_slice[1],
+ (1 << num_qubits) as i64,
+ "Second dimension should be 2^num_qubits"
+ );
let strides_slice = std::slice::from_raw_parts(tensor.strides,
tensor.ndim as usize);
let state_len = 1 << num_qubits;
- assert_eq!(strides_slice[0], state_len as i64, "Stride for first
dimension should be state_len");
- assert_eq!(strides_slice[1], 1, "Stride for second dimension should be
1");
+ assert_eq!(
+ strides_slice[0], state_len as i64,
+ "Stride for first dimension should be state_len"
+ );
+ assert_eq!(
+ strides_slice[1], 1,
+ "Stride for second dimension should be 1"
+ );
- println!("PASS: Batch DLPack tensor has correct 2D shape: [{}, {}]",
shape_slice[0], shape_slice[1]);
- println!("PASS: Strides are correct: [{}, {}]", strides_slice[0],
strides_slice[1]);
+ println!(
+ "PASS: Batch DLPack tensor has correct 2D shape: [{}, {}]",
+ shape_slice[0], shape_slice[1]
+ );
+ println!(
+ "PASS: Strides are correct: [{}, {}]",
+ strides_slice[0], strides_slice[1]
+ );
// Free memory
if let Some(deleter) = managed.deleter {
@@ -183,12 +217,21 @@ fn test_single_encode_dlpack_2d_shape() {
assert_eq!(tensor.ndim, 2, "Single encode should be 2D");
let shape_slice = std::slice::from_raw_parts(tensor.shape, tensor.ndim
as usize);
- assert_eq!(shape_slice[0], 1, "First dimension should be 1 for single
encode");
+ assert_eq!(
+ shape_slice[0], 1,
+ "First dimension should be 1 for single encode"
+ );
assert_eq!(shape_slice[1], 16, "Second dimension should be [2^4]");
let strides_slice = std::slice::from_raw_parts(tensor.strides,
tensor.ndim as usize);
- assert_eq!(strides_slice[0], 16, "Stride for first dimension should be
state_len");
- assert_eq!(strides_slice[1], 1, "Stride for second dimension should be
1");
+ assert_eq!(
+ strides_slice[0], 16,
+ "Stride for first dimension should be state_len"
+ );
+ assert_eq!(
+ strides_slice[1], 1,
+ "Stride for second dimension should be 1"
+ );
println!(
"PASS: Single encode returns 2D shape: [{}, {}]",
@@ -227,7 +270,10 @@ fn test_dlpack_device_id() {
let tensor = &managed.dl_tensor;
// Verify device_id is correctly set (0 for device 0)
- assert_eq!(tensor.device.device_id, 0, "device_id should be 0 for
device 0");
+ assert_eq!(
+ tensor.device.device_id, 0,
+ "device_id should be 0 for device 0"
+ );
// Verify device_type is CUDA (kDLCUDA = 2)
use qdp_core::dlpack::DLDeviceType;
@@ -236,7 +282,10 @@ fn test_dlpack_device_id() {
_ => panic!("Expected CUDA device type"),
}
- println!("PASS: DLPack device_id correctly set to {}",
tensor.device.device_id);
+ println!(
+ "PASS: DLPack device_id correctly set to {}",
+ tensor.device.device_id
+ );
// Free memory
if let Some(deleter) = managed.deleter {
diff --git a/qdp/qdp-core/tests/arrow_ipc_io.rs
b/qdp/qdp-core/tests/arrow_ipc_io.rs
index 1a9289c08..9f6dc739d 100644
--- a/qdp/qdp-core/tests/arrow_ipc_io.rs
+++ b/qdp/qdp-core/tests/arrow_ipc_io.rs
@@ -14,10 +14,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use qdp_core::io::read_arrow_ipc_batch;
-use arrow::array::{Float64Array, FixedSizeListArray};
+use arrow::array::{FixedSizeListArray, Float64Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::ipc::writer::FileWriter as ArrowFileWriter;
+use qdp_core::io::read_arrow_ipc_batch;
use std::fs::{self, File};
use std::sync::Arc;
@@ -38,12 +38,8 @@ fn test_read_arrow_ipc_fixed_size_list() {
// Write Arrow IPC with FixedSizeList format
let values_array = Float64Array::from(all_values.clone());
let field = Arc::new(Field::new("item", DataType::Float64, false));
- let list_array = FixedSizeListArray::new(
- field,
- sample_size as i32,
- Arc::new(values_array),
- None,
- );
+ let list_array =
+ FixedSizeListArray::new(field, sample_size as i32,
Arc::new(values_array), None);
let schema = Arc::new(Schema::new(vec![Field::new(
"data",
@@ -54,11 +50,9 @@ fn test_read_arrow_ipc_fixed_size_list() {
false,
)]));
- let batch = arrow::record_batch::RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(list_array)],
- )
- .unwrap();
+ let batch =
+ arrow::record_batch::RecordBatch::try_new(schema.clone(),
vec![Arc::new(list_array)])
+ .unwrap();
let file = File::create(temp_path).unwrap();
let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap();
@@ -87,10 +81,13 @@ fn test_read_arrow_ipc_list() {
let sample_size = 8;
// Create test data with List format
- let mut list_builder =
arrow::array::ListBuilder::new(Float64Array::builder(num_samples *
sample_size));
+ let mut list_builder =
+ arrow::array::ListBuilder::new(Float64Array::builder(num_samples *
sample_size));
for i in 0..num_samples {
- let values: Vec<f64> = (0..sample_size).map(|j| (i * sample_size + j)
as f64).collect();
+ let values: Vec<f64> = (0..sample_size)
+ .map(|j| (i * sample_size + j) as f64)
+ .collect();
list_builder.values().append_slice(&values);
list_builder.append(true);
}
@@ -103,11 +100,9 @@ fn test_read_arrow_ipc_list() {
false,
)]));
- let batch = arrow::record_batch::RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(list_array)],
- )
- .unwrap();
+ let batch =
+ arrow::record_batch::RecordBatch::try_new(schema.clone(),
vec![Arc::new(list_array)])
+ .unwrap();
let file = File::create(temp_path).unwrap();
let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap();
@@ -141,7 +136,9 @@ fn test_arrow_ipc_inconsistent_sizes_fails() {
list_builder.append(true);
// Second sample: 8 elements (inconsistent!)
- list_builder.values().append_slice(&[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0,
12.0]);
+ list_builder
+ .values()
+ .append_slice(&[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0]);
list_builder.append(true);
let list_array = list_builder.finish();
@@ -152,11 +149,9 @@ fn test_arrow_ipc_inconsistent_sizes_fails() {
false,
)]));
- let batch = arrow::record_batch::RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(list_array)],
- )
- .unwrap();
+ let batch =
+ arrow::record_batch::RecordBatch::try_new(schema.clone(),
vec![Arc::new(list_array)])
+ .unwrap();
let file = File::create(temp_path).unwrap();
let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap();
@@ -194,12 +189,8 @@ fn test_arrow_ipc_large_batch() {
// Write as FixedSizeList
let values_array = Float64Array::from(all_values.clone());
let field = Arc::new(Field::new("item", DataType::Float64, false));
- let list_array = FixedSizeListArray::new(
- field,
- sample_size as i32,
- Arc::new(values_array),
- None,
- );
+ let list_array =
+ FixedSizeListArray::new(field, sample_size as i32,
Arc::new(values_array), None);
let schema = Arc::new(Schema::new(vec![Field::new(
"data",
@@ -210,11 +201,9 @@ fn test_arrow_ipc_large_batch() {
false,
)]));
- let batch = arrow::record_batch::RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(list_array)],
- )
- .unwrap();
+ let batch =
+ arrow::record_batch::RecordBatch::try_new(schema.clone(),
vec![Arc::new(list_array)])
+ .unwrap();
let file = File::create(temp_path).unwrap();
let mut writer = ArrowFileWriter::try_new(file, &schema).unwrap();
diff --git a/qdp/qdp-core/tests/memory_safety.rs
b/qdp/qdp-core/tests/memory_safety.rs
index 7084c071f..d18ac562b 100644
--- a/qdp/qdp-core/tests/memory_safety.rs
+++ b/qdp/qdp-core/tests/memory_safety.rs
@@ -116,11 +116,17 @@ fn test_dlpack_tensor_metadata_default() {
assert_eq!(shape[1], 1024, "Second dimension should be 1024 (2^10)");
let strides = std::slice::from_raw_parts(tensor.strides, tensor.ndim
as usize);
- assert_eq!(strides[0], 1024, "Stride for first dimension should be
state_len");
+ assert_eq!(
+ strides[0], 1024,
+ "Stride for first dimension should be state_len"
+ );
assert_eq!(strides[1], 1, "Stride for second dimension should be 1");
assert_eq!(tensor.dtype.code, 5, "Should be complex type (code=5)");
- assert_eq!(tensor.dtype.bits, 128, "Should be 128 bits (2x64-bit
floats, Float64)");
+ assert_eq!(
+ tensor.dtype.bits, 128,
+ "Should be 128 bits (2x64-bit floats, Float64)"
+ );
println!("PASS: DLPack metadata verified");
println!(" ndim: {}", tensor.ndim);
@@ -166,7 +172,10 @@ fn test_dlpack_tensor_metadata_f64() {
assert_eq!(shape[1], 1024, "Second dimension should be 1024 (2^10)");
let strides = std::slice::from_raw_parts(tensor.strides, tensor.ndim
as usize);
- assert_eq!(strides[0], 1024, "Stride for first dimension should be
state_len");
+ assert_eq!(
+ strides[0], 1024,
+ "Stride for first dimension should be state_len"
+ );
assert_eq!(strides[1], 1, "Stride for second dimension should be 1");
assert_eq!(tensor.dtype.code, 5, "Should be complex type (code=5)");
diff --git a/qdp/qdp-core/tests/parquet_io.rs b/qdp/qdp-core/tests/parquet_io.rs
index 7b45573ba..133495099 100644
--- a/qdp/qdp-core/tests/parquet_io.rs
+++ b/qdp/qdp-core/tests/parquet_io.rs
@@ -14,10 +14,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use qdp_core::io::{
- read_parquet, read_parquet_to_arrow, write_arrow_to_parquet, write_parquet,
-};
use arrow::array::Float64Array;
+use qdp_core::io::{read_parquet, read_parquet_to_arrow,
write_arrow_to_parquet, write_parquet};
use std::fs;
mod common;
diff --git a/qdp/qdp-core/tests/preprocessing.rs
b/qdp/qdp-core/tests/preprocessing.rs
index cc7885943..bd1958308 100644
--- a/qdp/qdp-core/tests/preprocessing.rs
+++ b/qdp/qdp-core/tests/preprocessing.rs
@@ -14,8 +14,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use qdp_core::preprocessing::Preprocessor;
use qdp_core::MahoutError;
+use qdp_core::preprocessing::Preprocessor;
#[test]
fn test_validate_input_success() {
@@ -37,21 +37,27 @@ fn test_validate_input_zero_qubits() {
fn test_validate_input_too_many_qubits() {
let data = vec![1.0];
let result = Preprocessor::validate_input(&data, 31);
- assert!(matches!(result, Err(MahoutError::InvalidInput(msg)) if
msg.contains("exceeds practical limit")));
+ assert!(
+ matches!(result, Err(MahoutError::InvalidInput(msg)) if
msg.contains("exceeds practical limit"))
+ );
}
#[test]
fn test_validate_input_empty_data() {
let data: Vec<f64> = vec![];
let result = Preprocessor::validate_input(&data, 1);
- assert!(matches!(result, Err(MahoutError::InvalidInput(msg)) if
msg.contains("cannot be empty")));
+ assert!(
+ matches!(result, Err(MahoutError::InvalidInput(msg)) if
msg.contains("cannot be empty"))
+ );
}
#[test]
fn test_validate_input_data_too_large() {
let data = vec![1.0, 0.0, 0.0]; // 3 elements
let result = Preprocessor::validate_input(&data, 1); // max size 2^1 = 2
- assert!(matches!(result, Err(MahoutError::InvalidInput(msg)) if
msg.contains("exceeds state vector size")));
+ assert!(
+ matches!(result, Err(MahoutError::InvalidInput(msg)) if
msg.contains("exceeds state vector size"))
+ );
}
#[test]
diff --git a/qdp/qdp-core/tests/validation.rs b/qdp/qdp-core/tests/validation.rs
index 6fc591e53..7ac25eaf2 100644
--- a/qdp/qdp-core/tests/validation.rs
+++ b/qdp/qdp-core/tests/validation.rs
@@ -16,7 +16,7 @@
// Input validation and error handling tests
-use qdp_core::{QdpEngine, MahoutError};
+use qdp_core::{MahoutError, QdpEngine};
mod common;
@@ -37,7 +37,10 @@ fn test_input_validation_invalid_strategy() {
match result {
Err(MahoutError::InvalidInput(msg)) => {
- assert!(msg.contains("Unknown encoder"), "Error message should
mention unknown encoder");
+ assert!(
+ msg.contains("Unknown encoder"),
+ "Error message should mention unknown encoder"
+ );
println!("PASS: Correctly rejected invalid strategy: {}", msg);
}
_ => panic!("Expected InvalidInput error for invalid strategy"),
@@ -58,12 +61,17 @@ fn test_input_validation_qubit_mismatch() {
// 100 elements need 7 qubits (2^7=128), but we request 6 (2^6=64)
let result = engine.encode(&data, 6, "amplitude");
- assert!(result.is_err(), "Should reject data larger than state vector");
+ assert!(
+ result.is_err(),
+ "Should reject data larger than state vector"
+ );
match result {
Err(MahoutError::InvalidInput(msg)) => {
- assert!(msg.contains("exceeds state vector size"),
- "Error should mention size mismatch");
+ assert!(
+ msg.contains("exceeds state vector size"),
+ "Error should mention size mismatch"
+ );
println!("PASS: Correctly rejected qubit mismatch: {}", msg);
}
_ => panic!("Expected InvalidInput error for size mismatch"),
@@ -87,7 +95,10 @@ fn test_input_validation_zero_qubits() {
match result {
Err(MahoutError::InvalidInput(msg)) => {
- assert!(msg.contains("at least 1"), "Error should mention minimum
qubit requirement");
+ assert!(
+ msg.contains("at least 1"),
+ "Error should mention minimum qubit requirement"
+ );
println!("PASS: Correctly rejected zero qubits: {}", msg);
}
_ => panic!("Expected InvalidInput error for zero qubits"),
@@ -111,8 +122,10 @@ fn test_input_validation_max_qubits() {
match result {
Err(MahoutError::InvalidInput(msg)) => {
- assert!(msg.contains("exceeds") && msg.contains("30"),
- "Error should mention 30 qubit limit");
+ assert!(
+ msg.contains("exceeds") && msg.contains("30"),
+ "Error should mention 30 qubit limit"
+ );
println!("PASS: Correctly rejected excessive qubits: {}", msg);
}
_ => panic!("Expected InvalidInput error for max qubits"),
@@ -135,8 +148,10 @@ fn test_input_validation_batch_zero_samples() {
match result {
Err(MahoutError::InvalidInput(msg)) => {
- assert!(msg.contains("num_samples must be greater than 0"),
- "Error should mention num_samples requirement");
+ assert!(
+ msg.contains("num_samples must be greater than 0"),
+ "Error should mention num_samples requirement"
+ );
println!("PASS: Correctly rejected zero num_samples: {}", msg);
}
_ => panic!("Expected InvalidInput error for zero num_samples"),
diff --git a/qdp/qdp-kernels/build.rs b/qdp/qdp-kernels/build.rs
index 3b1bd8e8b..d25a88d9e 100644
--- a/qdp/qdp-kernels/build.rs
+++ b/qdp/qdp-kernels/build.rs
@@ -31,23 +31,21 @@ fn main() {
println!("cargo:rerun-if-changed=src/amplitude.cu");
// Check if CUDA is available by looking for nvcc
- let has_cuda = Command::new("nvcc")
- .arg("--version")
- .output()
- .is_ok();
+ let has_cuda = Command::new("nvcc").arg("--version").output().is_ok();
if !has_cuda {
println!("cargo:warning=CUDA not found (nvcc not in PATH). Skipping
kernel compilation.");
println!("cargo:warning=This is expected on macOS or non-CUDA
environments.");
- println!("cargo:warning=The project will build, but GPU functionality
will not be available.");
+ println!(
+ "cargo:warning=The project will build, but GPU functionality will
not be available."
+ );
println!("cargo:warning=For production deployment, ensure CUDA toolkit
is installed.");
return;
}
// Get CUDA installation path
// Priority: CUDA_PATH env var > /usr/local/cuda (default Linux location)
- let cuda_path = env::var("CUDA_PATH")
- .unwrap_or_else(|_| "/usr/local/cuda".to_string());
+ let cuda_path = env::var("CUDA_PATH").unwrap_or_else(|_|
"/usr/local/cuda".to_string());
println!("cargo:rustc-link-search=native={}/lib64", cuda_path);
println!("cargo:rustc-link-lib=cudart");
@@ -64,8 +62,8 @@ fn main() {
build
.cuda(true)
- .flag("-cudart=shared") // Use shared CUDA runtime
- .flag("-std=c++17") // C++17 for modern CUDA features
+ .flag("-cudart=shared") // Use shared CUDA runtime
+ .flag("-std=c++17") // C++17 for modern CUDA features
// GPU architecture targets
// SM 75 = Turing (T4, RTX 2000 series)
// SM 80 = Ampere (A100, RTX 3000 series)
diff --git a/qdp/qdp-kernels/src/lib.rs b/qdp/qdp-kernels/src/lib.rs
index d9fc0a163..4eda08696 100644
--- a/qdp/qdp-kernels/src/lib.rs
+++ b/qdp/qdp-kernels/src/lib.rs
@@ -24,8 +24,8 @@ use std::ffi::c_void;
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct CuDoubleComplex {
- pub x: f64, // Real part
- pub y: f64, // Imaginary part
+ pub x: f64, // Real part
+ pub y: f64, // Imaginary part
}
// Implement DeviceRepr for cudarc compatibility
@@ -40,8 +40,8 @@ unsafe impl cudarc::driver::ValidAsZeroBits for
CuDoubleComplex {}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct CuComplex {
- pub x: f32, // Real part
- pub y: f32, // Imaginary part
+ pub x: f32, // Real part
+ pub y: f32, // Imaginary part
}
// Implement DeviceRepr for cudarc compatibility
diff --git a/qdp/qdp-kernels/tests/amplitude_encode.rs
b/qdp/qdp-kernels/tests/amplitude_encode.rs
index 4223dd0bb..beebbf068 100644
--- a/qdp/qdp-kernels/tests/amplitude_encode.rs
+++ b/qdp/qdp-kernels/tests/amplitude_encode.rs
@@ -20,12 +20,8 @@
use cudarc::driver::{CudaDevice, DevicePtr, DevicePtrMut};
#[cfg(target_os = "linux")]
use qdp_kernels::{
- CuComplex,
- CuDoubleComplex,
- launch_amplitude_encode,
- launch_amplitude_encode_f32,
- launch_l2_norm,
- launch_l2_norm_batch,
+ CuComplex, CuDoubleComplex, launch_amplitude_encode,
launch_amplitude_encode_f32,
+ launch_l2_norm, launch_l2_norm_batch,
};
const EPSILON: f64 = 1e-10;
@@ -138,15 +134,36 @@ fn test_amplitude_encode_basic_f32() {
let state_h = device.dtoh_sync_copy(&state_d).unwrap();
- assert!((state_h[0].x - 0.6).abs() < EPSILON_F32, "First element should be
0.6");
- assert!(state_h[0].y.abs() < EPSILON_F32, "First element imaginary should
be 0");
- assert!((state_h[1].x - 0.8).abs() < EPSILON_F32, "Second element should
be 0.8");
- assert!(state_h[1].y.abs() < EPSILON_F32, "Second element imaginary should
be 0");
- assert!(state_h[2].x.abs() < EPSILON_F32, "Third element should be 0");
- assert!(state_h[3].x.abs() < EPSILON_F32, "Fourth element should be 0");
+ assert!(
+ (state_h[0].x - 0.6).abs() < EPSILON_F32,
+ "First element should be 0.6"
+ );
+ assert!(
+ state_h[0].y.abs() < EPSILON_F32,
+ "First element imaginary should be 0"
+ );
+ assert!(
+ (state_h[1].x - 0.8).abs() < EPSILON_F32,
+ "Second element should be 0.8"
+ );
+ assert!(
+ state_h[1].y.abs() < EPSILON_F32,
+ "Second element imaginary should be 0"
+ );
+ assert!(
+ state_h[2].x.abs() < EPSILON_F32,
+ "Third element should be 0"
+ );
+ assert!(
+ state_h[3].x.abs() < EPSILON_F32,
+ "Fourth element should be 0"
+ );
let total_prob: f32 = state_h.iter().map(|c| c.x * c.x + c.y * c.y).sum();
- assert!((total_prob - 1.0).abs() < EPSILON_F32, "Total probability should
be 1.0");
+ assert!(
+ (total_prob - 1.0).abs() < EPSILON_F32,
+ "Total probability should be 1.0"
+ );
println!("PASS: Basic float32 amplitude encoding works correctly");
}
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index 642a9a7fa..df68be1bc 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -14,11 +14,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use pyo3::prelude::*;
use pyo3::exceptions::PyRuntimeError;
use pyo3::ffi;
-use qdp_core::{Precision, QdpEngine as CoreEngine};
+use pyo3::prelude::*;
use qdp_core::dlpack::DLManagedTensor;
+use qdp_core::{Precision, QdpEngine as CoreEngine};
/// Quantum tensor wrapper implementing DLPack protocol
///
@@ -53,10 +53,10 @@ impl QuantumTensor {
/// RuntimeError: If the tensor has already been consumed
#[pyo3(signature = (stream=None))]
fn __dlpack__<'py>(&mut self, py: Python<'py>, stream: Option<i64>) ->
PyResult<Py<PyAny>> {
- let _ = stream; // Suppress unused variable warning
+ let _ = stream; // Suppress unused variable warning
if self.consumed {
return Err(PyRuntimeError::new_err(
- "DLPack tensor already consumed (can only be used once)"
+ "DLPack tensor already consumed (can only be used once)",
));
}
@@ -78,7 +78,7 @@ impl QuantumTensor {
let capsule_ptr = ffi::PyCapsule_New(
self.ptr as *mut std::ffi::c_void,
DLTENSOR_NAME.as_ptr() as *const i8,
- None // No destructor - PyTorch handles it
+ None, // No destructor - PyTorch handles it
);
if capsule_ptr.is_null() {
@@ -170,7 +170,7 @@ impl QdpEngine {
return Err(PyRuntimeError::new_err(format!(
"Unsupported precision '{}'. Use 'float32' (default) or
'float64'.",
other
- )))
+ )));
}
};
@@ -199,8 +199,15 @@ impl QdpEngine {
/// >>> torch_tensor = torch.from_dlpack(qtensor)
///
/// TODO: Use numpy array input (`PyReadonlyArray1<f64>`) for zero-copy
instead of `Vec<f64>`.
- fn encode(&self, data: Vec<f64>, num_qubits: usize, encoding_method: &str)
-> PyResult<QuantumTensor> {
- let ptr = self.engine.encode(&data, num_qubits, encoding_method)
+ fn encode(
+ &self,
+ data: Vec<f64>,
+ num_qubits: usize,
+ encoding_method: &str,
+ ) -> PyResult<QuantumTensor> {
+ let ptr = self
+ .engine
+ .encode(&data, num_qubits, encoding_method)
.map_err(|e| PyRuntimeError::new_err(format!("Encoding failed:
{}", e)))?;
Ok(QuantumTensor {
ptr,
@@ -222,8 +229,15 @@ impl QdpEngine {
/// >>> engine = QdpEngine(device_id=0)
/// >>> batched = engine.encode_from_parquet("data.parquet", 16,
"amplitude")
/// >>> torch_tensor = torch.from_dlpack(batched) # Shape: [200,
65536]
- fn encode_from_parquet(&self, path: &str, num_qubits: usize,
encoding_method: &str) -> PyResult<QuantumTensor> {
- let ptr = self.engine.encode_from_parquet(path, num_qubits,
encoding_method)
+ fn encode_from_parquet(
+ &self,
+ path: &str,
+ num_qubits: usize,
+ encoding_method: &str,
+ ) -> PyResult<QuantumTensor> {
+ let ptr = self
+ .engine
+ .encode_from_parquet(path, num_qubits, encoding_method)
.map_err(|e| PyRuntimeError::new_err(format!("Encoding from
parquet failed: {}", e)))?;
Ok(QuantumTensor {
ptr,
@@ -245,9 +259,18 @@ impl QdpEngine {
/// >>> engine = QdpEngine(device_id=0)
/// >>> batched = engine.encode_from_arrow_ipc("data.arrow", 16,
"amplitude")
/// >>> torch_tensor = torch.from_dlpack(batched)
- fn encode_from_arrow_ipc(&self, path: &str, num_qubits: usize,
encoding_method: &str) -> PyResult<QuantumTensor> {
- let ptr = self.engine.encode_from_arrow_ipc(path, num_qubits,
encoding_method)
- .map_err(|e| PyRuntimeError::new_err(format!("Encoding from Arrow
IPC failed: {}", e)))?;
+ fn encode_from_arrow_ipc(
+ &self,
+ path: &str,
+ num_qubits: usize,
+ encoding_method: &str,
+ ) -> PyResult<QuantumTensor> {
+ let ptr = self
+ .engine
+ .encode_from_arrow_ipc(path, num_qubits, encoding_method)
+ .map_err(|e| {
+ PyRuntimeError::new_err(format!("Encoding from Arrow IPC
failed: {}", e))
+ })?;
Ok(QuantumTensor {
ptr,
consumed: false,