This is an automated email from the ASF dual-hosted git repository.
jiekaichang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/main by this push:
new e686a91cb [QDP] Add Angle Encoding Async Pipeline for Large Batch
Uploads (#928)
e686a91cb is described below
commit e686a91cbffd8ccdd0ec4f9f1d515dc1f7acb9e4
Author: Jie-Kai Chang <[email protected]>
AuthorDate: Tue Jan 27 19:50:38 2026 +0800
[QDP] Add Angle Encoding Async Pipeline for Large Batch Uploads (#928)
* angle async pipeline
Signed-off-by: 400Ping <[email protected]>
* fix pre-commit
Signed-off-by: 400Ping <[email protected]>
* update
Signed-off-by: 400Ping <[email protected]>
* fix pre-commit
Signed-off-by: 400Ping <[email protected]>
---------
Signed-off-by: 400Ping <[email protected]>
---
qdp/qdp-core/src/gpu/encodings/angle.rs | 80 ++++++++++++++++++++++++++++++
qdp/qdp-core/src/gpu/pipeline.rs | 87 +++++++++++++++++++++++++++++----
qdp/qdp-core/tests/api_workflow.rs | 74 ++++++++++++++++++++++++++++
3 files changed, 232 insertions(+), 9 deletions(-)
diff --git a/qdp/qdp-core/src/gpu/encodings/angle.rs
b/qdp/qdp-core/src/gpu/encodings/angle.rs
index 769a7e96d..353a9f4c4 100644
--- a/qdp/qdp-core/src/gpu/encodings/angle.rs
+++ b/qdp/qdp-core/src/gpu/encodings/angle.rs
@@ -25,6 +25,8 @@ use super::{QuantumEncoder, validate_qubit_count};
use crate::error::cuda_error_to_string;
use crate::error::{MahoutError, Result};
use crate::gpu::memory::GpuStateVector;
+#[cfg(target_os = "linux")]
+use crate::gpu::pipeline::run_dual_stream_pipeline_aligned;
use cudarc::driver::CudaDevice;
use std::sync::Arc;
@@ -152,6 +154,18 @@ impl QuantumEncoder for AngleEncoder {
let state_len = 1 << num_qubits;
+ const ASYNC_THRESHOLD_ELEMENTS: usize = 1024 * 1024 /
std::mem::size_of::<f64>(); // 1MB
+ if batch_data.len() >= ASYNC_THRESHOLD_ELEMENTS {
+ return Self::encode_batch_async_pipeline(
+ device,
+ batch_data,
+ num_samples,
+ sample_size,
+ num_qubits,
+ state_len,
+ );
+ }
+
let batch_state_vector = {
crate::profile_scope!("GPU::AllocBatch");
GpuStateVector::new_batch(device, num_samples, num_qubits)?
@@ -231,3 +245,69 @@ impl QuantumEncoder for AngleEncoder {
"Angle encoding: per-qubit rotations into a product state"
}
}
+
+impl AngleEncoder {
+ #[cfg(target_os = "linux")]
+ fn encode_batch_async_pipeline(
+ device: &Arc<CudaDevice>,
+ batch_data: &[f64],
+ num_samples: usize,
+ sample_size: usize,
+ num_qubits: usize,
+ state_len: usize,
+ ) -> Result<GpuStateVector> {
+ let batch_state_vector = {
+ crate::profile_scope!("GPU::AllocBatch");
+ GpuStateVector::new_batch(device, num_samples, num_qubits)?
+ };
+
+ let state_ptr = batch_state_vector.ptr_f64().ok_or_else(|| {
+ MahoutError::InvalidInput(
+ "Batch state vector precision mismatch (expected float64
buffer)".to_string(),
+ )
+ })?;
+
+ run_dual_stream_pipeline_aligned(
+ device,
+ batch_data,
+ sample_size,
+ |stream, input_ptr, chunk_offset, chunk_len| {
+ if chunk_len % sample_size != 0 || chunk_offset % sample_size
!= 0 {
+ return Err(MahoutError::InvalidInput(
+ "Angle batch chunk is not aligned to sample
size".to_string(),
+ ));
+ }
+
+ let chunk_samples = chunk_len / sample_size;
+ let sample_offset = chunk_offset / sample_size;
+ let offset_elements =
sample_offset.checked_mul(state_len).ok_or_else(|| {
+ MahoutError::InvalidInput("Angle batch output offset
overflow".to_string())
+ })?;
+
+ let state_ptr_offset = unsafe { state_ptr.add(offset_elements)
as *mut c_void };
+ let ret = unsafe {
+ qdp_kernels::launch_angle_encode_batch(
+ input_ptr,
+ state_ptr_offset,
+ chunk_samples,
+ state_len,
+ num_qubits as u32,
+ stream.stream as *mut c_void,
+ )
+ };
+
+ if ret != 0 {
+ return Err(MahoutError::KernelLaunch(format!(
+ "Batch angle encoding kernel failed: {} ({})",
+ ret,
+ cuda_error_to_string(ret)
+ )));
+ }
+
+ Ok(())
+ },
+ )?;
+
+ Ok(batch_state_vector)
+ }
+}
diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs
index 26874ab3b..75db1f1d2 100644
--- a/qdp/qdp-core/src/gpu/pipeline.rs
+++ b/qdp/qdp-core/src/gpu/pipeline.rs
@@ -241,34 +241,102 @@ impl Drop for PipelineContext {
pub fn run_dual_stream_pipeline<F>(
device: &Arc<CudaDevice>,
host_data: &[f64],
- mut kernel_launcher: F,
+ kernel_launcher: F,
) -> Result<()>
where
F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>,
{
crate::profile_scope!("GPU::AsyncPipeline");
- // Pinned host staging pool sized to the current chunking strategy
(double-buffer by default).
const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 /
std::mem::size_of::<f64>(); // 8MB
+ run_dual_stream_pipeline_with_chunk_size(
+ device,
+ host_data,
+ CHUNK_SIZE_ELEMENTS,
+ kernel_launcher,
+ )
+}
+
+/// Executes a task using dual-stream double-buffering with aligned chunk
boundaries.
+///
+/// `align_elements` must evenly divide the host data length and ensures
chunks do not
+/// split logical records (e.g., per-sample data in batch encoding).
+#[cfg(target_os = "linux")]
+pub fn run_dual_stream_pipeline_aligned<F>(
+ device: &Arc<CudaDevice>,
+ host_data: &[f64],
+ align_elements: usize,
+ kernel_launcher: F,
+) -> Result<()>
+where
+ F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>,
+{
+ crate::profile_scope!("GPU::AsyncPipelineAligned");
+
+ if align_elements == 0 {
+ return Err(MahoutError::InvalidInput(
+ "Alignment must be greater than zero".to_string(),
+ ));
+ }
+ if !host_data.len().is_multiple_of(align_elements) {
+ return Err(MahoutError::InvalidInput(format!(
+ "Host data length {} is not aligned to {} elements",
+ host_data.len(),
+ align_elements
+ )));
+ }
+
+ const BASE_CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 /
std::mem::size_of::<f64>(); // 8MB
+ let chunk_size_elements = if align_elements >= BASE_CHUNK_SIZE_ELEMENTS {
+ align_elements
+ } else {
+ BASE_CHUNK_SIZE_ELEMENTS - (BASE_CHUNK_SIZE_ELEMENTS % align_elements)
+ };
+
+ run_dual_stream_pipeline_with_chunk_size(
+ device,
+ host_data,
+ chunk_size_elements,
+ kernel_launcher,
+ )
+}
+
+#[cfg(target_os = "linux")]
+fn run_dual_stream_pipeline_with_chunk_size<F>(
+ device: &Arc<CudaDevice>,
+ host_data: &[f64],
+ chunk_size_elements: usize,
+ mut kernel_launcher: F,
+) -> Result<()>
+where
+ F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>,
+{
+ if chunk_size_elements == 0 {
+ return Err(MahoutError::InvalidInput(
+ "Chunk size must be greater than zero".to_string(),
+ ));
+ }
+
+ // Pinned host staging pool sized to the current chunking strategy
(double-buffer by default).
const PINNED_POOL_SIZE: usize = 2; // double buffering
// 1. Create dual streams with per-slot events to coordinate copy ->
compute
let ctx = PipelineContext::new(device, PINNED_POOL_SIZE)?;
- let pinned_pool = PinnedBufferPool::new(PINNED_POOL_SIZE,
CHUNK_SIZE_ELEMENTS)
+ let pinned_pool = PinnedBufferPool::new(PINNED_POOL_SIZE,
chunk_size_elements)
.map_err(|e| MahoutError::Cuda(format!("Failed to create pinned buffer
pool: {}", e)))?;
- // 2. Chunk size: 8MB per chunk (balance between overhead and overlap
opportunity)
- // TODO: tune dynamically based on GPU/PCIe bandwidth.
-
// 3. Keep temporary buffers alive until all streams complete
// This prevents Rust from dropping them while GPU is still using them
let mut keep_alive_buffers: Vec<CudaSlice<f64>> = Vec::new();
// Keep pinned buffers alive until the copy stream has completed their H2D
copy
let mut in_flight_pinned: Vec<PinnedBufferHandle> = Vec::new();
- let mut global_offset = 0;
-
// 4. Pipeline loop: copy on copy stream, compute on compute stream with
event handoff
- for (chunk_idx, chunk) in
host_data.chunks(CHUNK_SIZE_ELEMENTS).enumerate() {
+ let mut chunk_idx = 0usize;
+ let mut global_offset = 0usize;
+ while global_offset < host_data.len() {
+ let remaining = host_data.len() - global_offset;
+ let chunk_len = remaining.min(chunk_size_elements);
+ let chunk = &host_data[global_offset..global_offset + chunk_len];
let chunk_offset = global_offset;
let event_slot = chunk_idx % PINNED_POOL_SIZE;
@@ -330,6 +398,7 @@ where
// Update offset for next chunk
global_offset += chunk.len();
+ chunk_idx += 1;
}
// 5. Synchronize all streams: wait for all work to complete
diff --git a/qdp/qdp-core/tests/api_workflow.rs
b/qdp/qdp-core/tests/api_workflow.rs
index ff7f0d77c..bc94d4e34 100644
--- a/qdp/qdp-core/tests/api_workflow.rs
+++ b/qdp/qdp-core/tests/api_workflow.rs
@@ -16,7 +16,13 @@
// API workflow tests: Engine initialization and encoding
+#[cfg(target_os = "linux")]
+use cudarc::driver::CudaDevice;
+#[cfg(target_os = "linux")]
+use qdp_core::MahoutError;
use qdp_core::QdpEngine;
+#[cfg(target_os = "linux")]
+use qdp_core::gpu::pipeline::run_dual_stream_pipeline_aligned;
mod common;
@@ -113,6 +119,74 @@ fn test_amplitude_encoding_async_pipeline() {
}
}
+#[test]
+#[cfg(target_os = "linux")]
+fn test_angle_encoding_async_pipeline() {
+ println!("Testing angle encoding async pipeline path...");
+
+ let engine = match QdpEngine::new(0) {
+ Ok(e) => e,
+ Err(_) => {
+ println!("SKIP: No GPU available");
+ return;
+ }
+ };
+
+ let num_qubits = 4;
+ let sample_size = num_qubits;
+ let num_samples = 32768; // 32768 * 4 = 131072 elements (>= 1MB threshold)
+ let batch_data = common::create_test_data(num_samples * sample_size);
+
+ let result = engine.encode_batch(&batch_data, num_samples, sample_size,
num_qubits, "angle");
+ let dlpack_ptr = result.expect("Angle batch encoding should succeed");
+ assert!(!dlpack_ptr.is_null(), "DLPack pointer should not be null");
+ println!("PASS: Angle batch encoding succeeded, DLPack pointer valid");
+
+ unsafe {
+ let managed = &mut *dlpack_ptr;
+ assert!(managed.deleter.is_some(), "Deleter must be present");
+
+ println!("Calling deleter to free GPU memory");
+ let deleter = managed
+ .deleter
+ .take()
+ .expect("Deleter function pointer is missing!");
+ deleter(dlpack_ptr);
+ println!("PASS: Memory freed successfully");
+ }
+}
+
+#[test]
+#[cfg(target_os = "linux")]
+fn test_angle_async_alignment_error() {
+ println!("Testing angle async pipeline alignment error...");
+
+ let device = match CudaDevice::new(0) {
+ Ok(d) => d,
+ Err(_) => {
+ println!("SKIP: No GPU available");
+ return;
+ }
+ };
+
+ let misaligned_data = vec![0.0_f64; 10];
+ let result =
+ run_dual_stream_pipeline_aligned(&device, &misaligned_data, 4, |_, _,
_, _| Ok(()));
+
+ match result {
+ Err(MahoutError::InvalidInput(msg)) => {
+ assert!(
+ msg.contains("not aligned"),
+ "Expected alignment error, got: {}",
+ msg
+ );
+ println!("PASS: Alignment error surfaced as expected");
+ }
+ Err(e) => panic!("Unexpected error: {:?}", e),
+ Ok(_) => panic!("Expected alignment error, got Ok"),
+ }
+}
+
#[test]
#[cfg(target_os = "linux")]
fn test_batch_dlpack_2d_shape() {