This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch dev-qdp
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/dev-qdp by this push:
new 76db5a4c7 [QDP] make dataflow async and improve amplitudeEncoder
(#675)
76db5a4c7 is described below
commit 76db5a4c714638107a1418a8891fe87f6f9a21d0
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Thu Dec 4 15:58:03 2025 +0800
[QDP] make dataflow async and improve amplitudeEncoder (#675)
* async pipline
* prevent error
* Add TODO for dynamic chunk size tuning
Added a TODO comment to tune chunk size dynamically based on GPU model or
PCIe bandwidth.
* update
* add a test
---
qdp/qdp-core/src/gpu/encodings/amplitude.rs | 122 ++++++++++++++++++-
qdp/qdp-core/src/gpu/memory.rs | 2 +
qdp/qdp-core/src/gpu/mod.rs | 2 +
qdp/qdp-core/src/gpu/pipeline.rs | 174 ++++++++++++++++++++++++++++
qdp/qdp-core/tests/api_workflow.rs | 35 ++++++
qdp/qdp-kernels/build.rs | 2 -
qdp/qdp-kernels/src/amplitude.cu | 4 +-
qdp/qdp-kernels/src/lib.rs | 8 +-
qdp/qdp-python/pyproject.toml | 2 +-
qdp/qdp-python/src/lib.rs | 11 +-
qdp/qdp-python/uv.lock | 2 +-
11 files changed, 345 insertions(+), 19 deletions(-)
diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
index 1709eab77..38551d15c 100644
--- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
+++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
@@ -20,6 +20,7 @@ use std::sync::Arc;
use cudarc::driver::CudaDevice;
use crate::error::{MahoutError, Result};
use crate::gpu::memory::GpuStateVector;
+use crate::gpu::pipeline::run_dual_stream_pipeline;
use super::QuantumEncoder;
#[cfg(target_os = "linux")]
@@ -57,23 +58,27 @@ impl QuantumEncoder for AmplitudeEncoder {
GpuStateVector::new(_device, num_qubits)?
};
- // Copy input data to GPU (synchronous, zero-copy from slice)
- // TODO : Use async CUDA streams for pipeline overlap
+ // SSS-Tier Optimization: Async Pipeline for large data
+ // For small data (< 1MB), use synchronous path to avoid stream
overhead
+ // For large data, use dual-stream async pipeline for maximum
throughput
+ const ASYNC_THRESHOLD: usize = 1024 * 1024 /
std::mem::size_of::<f64>(); // 1MB threshold
+
+ if host_data.len() < ASYNC_THRESHOLD {
+ // Synchronous path for small data (avoids stream overhead)
let input_slice = {
crate::profile_scope!("GPU::H2DCopy");
_device.htod_sync_copy(host_data)
.map_err(|e| MahoutError::MemoryAllocation(format!("Failed
to allocate input buffer: {:?}", e)))?
};
- // Launch CUDA kernel (CPU-side launch only; execution is
asynchronous)
let ret = {
crate::profile_scope!("GPU::KernelLaunch");
unsafe {
launch_amplitude_encode(
*input_slice.device_ptr() as *const f64,
state_vector.ptr() as *mut c_void,
- host_data.len() as i32,
- state_len as i32,
+ host_data.len(),
+ state_len,
norm,
std::ptr::null_mut(), // default stream
)
@@ -89,12 +94,15 @@ impl QuantumEncoder for AmplitudeEncoder {
return Err(MahoutError::KernelLaunch(error_msg));
}
- // Block until all work on the device is complete
{
crate::profile_scope!("GPU::Synchronize");
_device
.synchronize()
.map_err(|e| MahoutError::Cuda(format!("CUDA device
synchronize failed: {:?}", e)))?;
+ }
+ } else {
+ // Async Pipeline path for large data
+ Self::encode_async_pipeline(_device, host_data, num_qubits,
state_len, norm, &state_vector)?;
}
Ok(state_vector)
@@ -115,6 +123,108 @@ impl QuantumEncoder for AmplitudeEncoder {
}
}
+impl AmplitudeEncoder {
+ /// Async pipeline encoding for large data (SSS-tier optimization)
+ ///
+ /// Uses the generic dual-stream pipeline infrastructure to overlap
+ /// data transfer and computation. The pipeline handles all the
+ /// streaming mechanics, while this method focuses on the amplitude
+ /// encoding kernel logic.
+ #[cfg(target_os = "linux")]
+ fn encode_async_pipeline(
+ device: &Arc<CudaDevice>,
+ host_data: &[f64],
+ _num_qubits: usize,
+ state_len: usize,
+ norm: f64,
+ state_vector: &GpuStateVector,
+ ) -> Result<()> {
+ // Use generic pipeline infrastructure
+ // The closure handles amplitude-specific kernel launch logic
+ run_dual_stream_pipeline(device, host_data, |stream, input_ptr,
chunk_offset, chunk_len| {
+ // Calculate offset pointer for state vector (type-safe pointer
arithmetic)
+ // Offset is in complex numbers (CuDoubleComplex), not f64 elements
+ let state_ptr_offset = unsafe {
+ state_vector.ptr().cast::<u8>()
+ .add(chunk_offset *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+ .cast::<std::ffi::c_void>()
+ };
+
+ // Launch amplitude encoding kernel on the provided stream
+ let ret = unsafe {
+ launch_amplitude_encode(
+ input_ptr,
+ state_ptr_offset,
+ chunk_len,
+ state_len,
+ norm,
+ stream.stream as *mut c_void,
+ )
+ };
+
+ if ret != 0 {
+ let error_msg = format!(
+ "Kernel launch failed with CUDA error code: {} ({})",
+ ret,
+ cuda_error_to_string(ret)
+ );
+ return Err(MahoutError::KernelLaunch(error_msg));
+ }
+
+ Ok(())
+ })?;
+
+ // CRITICAL FIX: Handle padding for uninitialized memory
+ // Since we use alloc() (uninitialized), we must zero-fill any tail
region
+ // that wasn't written by the pipeline. This ensures correctness when
+ // host_data.len() < state_len (e.g., 1000 elements in a 1024-element
state).
+ let data_len = host_data.len();
+ if data_len < state_len {
+ let padding_start = data_len;
+ let padding_elements = state_len - padding_start;
+ let padding_bytes = padding_elements *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>();
+
+ // Calculate tail pointer (in complex numbers)
+ let tail_ptr = unsafe {
+ state_vector.ptr().add(padding_start) as *mut c_void
+ };
+
+ // Zero-fill padding region using CUDA Runtime API
+ // Use default stream since pipeline streams are already
synchronized
+ unsafe {
+ unsafe extern "C" {
+ fn cudaMemsetAsync(
+ devPtr: *mut c_void,
+ value: i32,
+ count: usize,
+ stream: *mut c_void,
+ ) -> i32;
+ }
+
+ let result = cudaMemsetAsync(
+ tail_ptr,
+ 0,
+ padding_bytes,
+ std::ptr::null_mut(), // default stream
+ );
+
+ if result != 0 {
+ return Err(MahoutError::Cuda(
+ format!("Failed to zero-fill padding region: {} ({})",
+ result, cuda_error_to_string(result))
+ ));
+ }
+ }
+
+ // Synchronize to ensure padding is complete before returning
+ device.synchronize()
+ .map_err(|e| MahoutError::Cuda(format!("Failed to sync after
padding: {:?}", e)))?;
+ }
+
+ Ok(())
+ }
+}
+
/// Convert CUDA error code to human-readable string
#[cfg(target_os = "linux")]
fn cuda_error_to_string(code: i32) -> &'static str {
diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs
index 49f26602a..513c326c0 100644
--- a/qdp/qdp-core/src/gpu/memory.rs
+++ b/qdp/qdp-core/src/gpu/memory.rs
@@ -60,6 +60,8 @@ impl GpuStateVector {
#[cfg(target_os = "linux")]
{
// Use uninitialized allocation to avoid memory bandwidth waste.
+ // TODO: Consider using a memory pool for input buffers to avoid
repeated
+ // cudaMalloc overhead in high-frequency encode() calls.
let slice = unsafe {
_device.alloc::<CuDoubleComplex>(_size_elements)
}.map_err(|e| MahoutError::MemoryAllocation(
diff --git a/qdp/qdp-core/src/gpu/mod.rs b/qdp/qdp-core/src/gpu/mod.rs
index 91966b8d2..fe7cdace0 100644
--- a/qdp/qdp-core/src/gpu/mod.rs
+++ b/qdp/qdp-core/src/gpu/mod.rs
@@ -16,6 +16,8 @@
pub mod memory;
pub mod encodings;
+pub mod pipeline;
pub use memory::GpuStateVector;
pub use encodings::{QuantumEncoder, AmplitudeEncoder, AngleEncoder,
BasisEncoder, get_encoder};
+pub use pipeline::run_dual_stream_pipeline;
diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs
new file mode 100644
index 000000000..fd9a5989d
--- /dev/null
+++ b/qdp/qdp-core/src/gpu/pipeline.rs
@@ -0,0 +1,174 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Async Pipeline Infrastructure
+//
+// Provides generic double-buffered execution for large data processing.
+// Separates the "streaming mechanics" from the "kernel logic".
+
+use std::sync::Arc;
+use std::ffi::c_void;
+use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
+use crate::error::{MahoutError, Result};
+
+/// Chunk processing callback for async pipeline
+///
+/// This closure is called for each chunk with:
+/// - `stream`: The CUDA stream to launch the kernel on
+/// - `input_ptr`: Device pointer to the chunk data (already copied)
+/// - `chunk_offset`: Global offset in the original data (in elements)
+/// - `chunk_len`: Length of this chunk (in elements)
+pub type ChunkProcessor = dyn FnMut(&CudaStream, *const f64, usize, usize) ->
Result<()>;
+
+/// Executes a task using dual-stream double-buffering pattern
+///
+/// This function handles the generic pipeline mechanics:
+/// - Dual stream creation and management
+/// - Data chunking and async H2D copy
+/// - Buffer lifetime management
+/// - Stream synchronization
+///
+/// The caller provides a `kernel_launcher` closure that handles the
+/// specific kernel launch logic for each chunk.
+///
+/// # Arguments
+/// * `device` - The CUDA device
+/// * `host_data` - Full source data to process
+/// * `kernel_launcher` - Closure that launches the specific kernel for each
chunk
+///
+/// # Example
+/// ```rust,ignore
+/// run_dual_stream_pipeline(device, host_data, |stream, input_ptr, offset,
len| {
+/// // Launch your specific kernel here
+/// launch_my_kernel(input_ptr, offset, len, stream)?;
+/// Ok(())
+/// })?;
+/// ```
+#[cfg(target_os = "linux")]
+pub fn run_dual_stream_pipeline<F>(
+ device: &Arc<CudaDevice>,
+ host_data: &[f64],
+ mut kernel_launcher: F,
+) -> Result<()>
+where
+ F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>,
+{
+ crate::profile_scope!("GPU::AsyncPipeline");
+
+ // 1. Create dual streams for pipeline overlap
+ let stream1 = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 1:
{:?}", e)))?;
+ let stream2 = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 2:
{:?}", e)))?;
+ let streams = [&stream1, &stream2];
+
+ // 2. Chunk size: 8MB per chunk (balance between overhead and overlap
opportunity)
+ // TODO: we should tune this dynamically based on the detected GPU model
or PCIe bandwidth in the future.
+ // Too small = launch overhead dominates, too large = less overlap
+ const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 /
std::mem::size_of::<f64>(); // 8MB
+
+ // 3. Keep temporary buffers alive until all streams complete
+ // This prevents Rust from dropping them while GPU is still using them
+ let mut keep_alive_buffers: Vec<CudaSlice<f64>> = Vec::new();
+
+ let mut global_offset = 0;
+
+ // 4. Pipeline loop: alternate between streams for maximum overlap
+ for (chunk_idx, chunk) in
host_data.chunks(CHUNK_SIZE_ELEMENTS).enumerate() {
+ let current_stream = streams[chunk_idx % 2];
+
+ crate::profile_scope!("GPU::ChunkProcess");
+
+ // Allocate temporary device buffer for this chunk
+ let input_chunk_dev = unsafe {
+ device.alloc::<f64>(chunk.len())
+ }.map_err(|e| MahoutError::MemoryAllocation(
+ format!("Failed to allocate chunk buffer: {:?}", e)
+ ))?;
+
+ // Async copy: host to device (non-blocking, on specified stream)
+ // Uses CUDA Runtime API (cudaMemcpyAsync) for true async copy
+ {
+ crate::profile_scope!("GPU::H2DCopyAsync");
+ unsafe {
+ unsafe extern "C" {
+ fn cudaMemcpyAsync(
+ dst: *mut c_void,
+ src: *const c_void,
+ count: usize,
+ kind: u32,
+ stream: *mut c_void,
+ ) -> i32;
+ }
+
+ let dst_device_ptr = *input_chunk_dev.device_ptr() as *mut
c_void;
+ let src_host_ptr = chunk.as_ptr() as *const c_void;
+ let bytes = chunk.len() * std::mem::size_of::<f64>();
+ let stream_handle = current_stream.stream as *mut c_void;
+
+ // cudaMemcpyKind: cudaMemcpyHostToDevice = 1
+ const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
+
+ let result = cudaMemcpyAsync(
+ dst_device_ptr,
+ src_host_ptr,
+ bytes,
+ CUDA_MEMCPY_HOST_TO_DEVICE,
+ stream_handle,
+ );
+
+ if result != 0 {
+ return Err(MahoutError::Cuda(
+ format!("Async H2D copy failed with CUDA error: {}",
result)
+ ));
+ }
+ }
+ }
+
+ // Get device pointer for kernel launch
+ let input_ptr = *input_chunk_dev.device_ptr() as *const f64;
+
+ // Invoke caller's kernel launcher (non-blocking)
+ {
+ crate::profile_scope!("GPU::KernelLaunchAsync");
+ kernel_launcher(current_stream, input_ptr, global_offset,
chunk.len())?;
+ }
+
+ // Keep buffer alive until synchronization
+ // Critical: Rust will drop CudaSlice when it goes out of scope, which
calls cudaFree.
+ // We must keep these buffers alive until all GPU work completes.
+ keep_alive_buffers.push(input_chunk_dev);
+
+ // Update offset for next chunk
+ global_offset += chunk.len();
+ }
+
+ // 5. Synchronize all streams: wait for all work to complete
+ // This ensures all async copies and kernel launches have finished
+ {
+ crate::profile_scope!("GPU::StreamSync");
+ device.wait_for(&stream1)
+ .map_err(|e| MahoutError::Cuda(format!("Stream 1 sync failed:
{:?}", e)))?;
+ device.wait_for(&stream2)
+ .map_err(|e| MahoutError::Cuda(format!("Stream 2 sync failed:
{:?}", e)))?;
+ }
+
+ // Buffers are dropped here (after sync), freeing GPU memory
+ // This is safe because all GPU operations have completed
+ drop(keep_alive_buffers);
+
+ Ok(())
+}
diff --git a/qdp/qdp-core/tests/api_workflow.rs
b/qdp/qdp-core/tests/api_workflow.rs
index f88b2eb83..a1e97e31a 100644
--- a/qdp/qdp-core/tests/api_workflow.rs
+++ b/qdp/qdp-core/tests/api_workflow.rs
@@ -72,3 +72,38 @@ fn test_amplitude_encoding_workflow() {
println!("PASS: Memory freed successfully");
}
}
+
+#[test]
+#[cfg(target_os = "linux")]
+fn test_amplitude_encoding_async_pipeline() {
+ println!("Testing amplitude encoding async pipeline path...");
+
+ let engine = match QdpEngine::new(0) {
+ Ok(e) => e,
+ Err(_) => {
+ println!("SKIP: No GPU available");
+ return;
+ }
+ };
+
+ // Use 200000 elements to trigger async pipeline path (ASYNC_THRESHOLD =
131072)
+ let data = common::create_test_data(200000);
+ println!("Created test data: {} elements", data.len());
+
+ let result = engine.encode(&data, 18, "amplitude");
+ assert!(result.is_ok(), "Encoding should succeed");
+
+ let dlpack_ptr = result.unwrap();
+ assert!(!dlpack_ptr.is_null(), "DLPack pointer should not be null");
+ println!("PASS: Encoding succeeded, DLPack pointer valid");
+
+ unsafe {
+ let managed = &mut *dlpack_ptr;
+ assert!(managed.deleter.is_some(), "Deleter must be present");
+
+ println!("Calling deleter to free GPU memory");
+ let deleter = managed.deleter.take().expect("Deleter function pointer
is missing!");
+ deleter(dlpack_ptr);
+ println!("PASS: Memory freed successfully");
+ }
+}
diff --git a/qdp/qdp-kernels/build.rs b/qdp/qdp-kernels/build.rs
index 2e3b01b27..c60d27c4a 100644
--- a/qdp/qdp-kernels/build.rs
+++ b/qdp/qdp-kernels/build.rs
@@ -81,6 +81,4 @@ fn main() {
// .flag("arch=compute_89,code=sm_89")
.file("src/amplitude.cu")
.compile("kernels");
-
- println!("cargo:warning=CUDA kernels compiled successfully");
}
diff --git a/qdp/qdp-kernels/src/amplitude.cu b/qdp/qdp-kernels/src/amplitude.cu
index 9e4537a71..c652cc70d 100644
--- a/qdp/qdp-kernels/src/amplitude.cu
+++ b/qdp/qdp-kernels/src/amplitude.cu
@@ -39,8 +39,8 @@ extern "C" {
int launch_amplitude_encode(
const double* input_d,
void* state_d,
- int input_len,
- int state_len,
+ size_t input_len,
+ size_t state_len,
double norm,
cudaStream_t stream
) {
diff --git a/qdp/qdp-kernels/src/lib.rs b/qdp/qdp-kernels/src/lib.rs
index 95970945f..a59733fb8 100644
--- a/qdp/qdp-kernels/src/lib.rs
+++ b/qdp/qdp-kernels/src/lib.rs
@@ -47,8 +47,8 @@ unsafe extern "C" {
pub fn launch_amplitude_encode(
input_d: *const f64,
state_d: *mut c_void,
- input_len: i32,
- state_len: i32,
+ input_len: usize,
+ state_len: usize,
norm: f64,
stream: *mut c_void,
) -> i32;
@@ -62,8 +62,8 @@ unsafe extern "C" {
pub extern "C" fn launch_amplitude_encode(
_input_d: *const f64,
_state_d: *mut c_void,
- _input_len: i32,
- _state_len: i32,
+ _input_len: usize,
+ _state_len: usize,
_norm: f64,
_stream: *mut c_void,
) -> i32 {
diff --git a/qdp/qdp-python/pyproject.toml b/qdp/qdp-python/pyproject.toml
index b109b30fc..b4e262dd8 100644
--- a/qdp/qdp-python/pyproject.toml
+++ b/qdp/qdp-python/pyproject.toml
@@ -17,7 +17,7 @@ dev = [
"maturin>=1.10.2",
"patchelf>=0.17.2.4",
"pytest>=9.0.1",
- "torch>=2.2,<2.3",
+ "torch>=2.2",
]
[[tool.uv.index]]
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index 8d65f3467..18484cf15 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -105,7 +105,13 @@ impl Drop for QuantumTensor {
// If consumed, PyTorch/consumer will call the deleter
if !self.consumed && !self.ptr.is_null() {
unsafe {
- // Call the DLPack deleter to properly free memory
+ // Defensive check: qdp-core always provides a deleter
+ debug_assert!(
+ (*self.ptr).deleter.is_some(),
+ "DLManagedTensor from qdp-core should always have a
deleter"
+ );
+
+ // Call the DLPack deleter to free memory
if let Some(deleter) = (*self.ptr).deleter {
deleter(self.ptr);
}
@@ -165,8 +171,7 @@ impl QdpEngine {
/// >>> qtensor = engine.encode([1.0, 2.0, 3.0, 4.0], num_qubits=2,
encoding_method="amplitude")
/// >>> torch_tensor = torch.from_dlpack(qtensor)
///
- /// TODO: Replace Vec<f64> with numpy array input to enable zero-copy
reading.
- /// Consider using the numpy crate (e.g., PyReadonlyArray1<f64>) for
better performance.
+ /// TODO: Use numpy array input (`PyReadonlyArray1<f64>`) for zero-copy
instead of `Vec<f64>`.
fn encode(&self, data: Vec<f64>, num_qubits: usize, encoding_method: &str)
-> PyResult<QuantumTensor> {
let ptr = self.engine.encode(&data, num_qubits, encoding_method)
.map_err(|e| PyRuntimeError::new_err(format!("Encoding failed:
{}", e)))?;
diff --git a/qdp/qdp-python/uv.lock b/qdp/qdp-python/uv.lock
index 07b338f5e..baa699256 100644
--- a/qdp/qdp-python/uv.lock
+++ b/qdp/qdp-python/uv.lock
@@ -348,7 +348,7 @@ dev = [
{ name = "maturin", specifier = ">=1.10.2" },
{ name = "patchelf", specifier = ">=0.17.2.4" },
{ name = "pytest", specifier = ">=9.0.1" },
- { name = "torch", specifier = ">=2.2,<2.3" },
+ { name = "torch", specifier = ">=2.2" },
]
[[package]]