Copilot commented on code in PR #675:
URL: https://github.com/apache/mahout/pull/675#discussion_r2585117499
##########
qdp/qdp-core/src/gpu/encodings/amplitude.rs:
##########
@@ -115,6 +123,108 @@ impl QuantumEncoder for AmplitudeEncoder {
}
}
+impl AmplitudeEncoder {
+ /// Async pipeline encoding for large data (SSS-tier optimization)
+ ///
+ /// Uses the generic dual-stream pipeline infrastructure to overlap
+ /// data transfer and computation. The pipeline handles all the
+ /// streaming mechanics, while this method focuses on the amplitude
+ /// encoding kernel logic.
+ #[cfg(target_os = "linux")]
+ fn encode_async_pipeline(
+ device: &Arc<CudaDevice>,
+ host_data: &[f64],
+ _num_qubits: usize,
+ state_len: usize,
+ norm: f64,
+ state_vector: &GpuStateVector,
+ ) -> Result<()> {
+ // Use generic pipeline infrastructure
+ // The closure handles amplitude-specific kernel launch logic
+ run_dual_stream_pipeline(device, host_data, |stream, input_ptr,
chunk_offset, chunk_len| {
+ // Calculate offset pointer for state vector (type-safe pointer
arithmetic)
+ // Offset is in complex numbers (CuDoubleComplex), not f64 elements
+ let state_ptr_offset = unsafe {
+ state_vector.ptr().cast::<u8>()
+ .add(chunk_offset *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+ .cast::<std::ffi::c_void>()
+ };
+
+ // Launch amplitude encoding kernel on the provided stream
+ let ret = unsafe {
+ launch_amplitude_encode(
+ input_ptr,
+ state_ptr_offset,
+ chunk_len,
+ state_len,
+ norm,
+ stream.stream as *mut c_void,
+ )
+ };
+
+ if ret != 0 {
+ let error_msg = format!(
+ "Kernel launch failed with CUDA error code: {} ({})",
+ ret,
+ cuda_error_to_string(ret)
+ );
+ return Err(MahoutError::KernelLaunch(error_msg));
+ }
+
+ Ok(())
+ })?;
+
+ // CRITICAL FIX: Handle padding for uninitialized memory
+ // Since we use alloc() (uninitialized), we must zero-fill any tail
region
+ // that wasn't written by the pipeline. This ensures correctness when
+ // host_data.len() < state_len (e.g., 1000 elements in a 1024-element
state).
+ let data_len = host_data.len();
+ if data_len < state_len {
+ let padding_start = data_len;
+ let padding_elements = state_len - padding_start;
+ let padding_bytes = padding_elements *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>();
+
+ // Calculate tail pointer (in complex numbers)
+ let tail_ptr = unsafe {
+ state_vector.ptr().add(padding_start) as *mut c_void
+ };
+
+ // Zero-fill padding region using CUDA Runtime API
+ // Use default stream since pipeline streams are already
synchronized
+ unsafe {
+ unsafe extern "C" {
+ fn cudaMemsetAsync(
+ devPtr: *mut c_void,
+ value: i32,
+ count: usize,
+ stream: *mut c_void,
+ ) -> i32;
+ }
Review Comment:
Nested `unsafe` blocks are redundant. The outer `unsafe` block on line 194
already covers the scope, so the inner `unsafe extern "C"` block on line 195
doesn't provide additional safety documentation. Consider moving the FFI
declaration outside the function or to a module level to avoid nesting and
improve clarity.
##########
qdp/qdp-core/src/gpu/encodings/amplitude.rs:
##########
@@ -89,12 +94,15 @@ impl QuantumEncoder for AmplitudeEncoder {
return Err(MahoutError::KernelLaunch(error_msg));
}
- // Block until all work on the device is complete
{
crate::profile_scope!("GPU::Synchronize");
_device
.synchronize()
.map_err(|e| MahoutError::Cuda(format!("CUDA device
synchronize failed: {:?}", e)))?;
+ }
+ } else {
+ // Async Pipeline path for large data
+ Self::encode_async_pipeline(_device, host_data, num_qubits,
state_len, norm, &state_vector)?;
Review Comment:
The new async pipeline path lacks test coverage. While the synchronous path
is tested with 1024 elements in `test_amplitude_encoding_workflow`, there are
no tests exercising the async pipeline (which triggers for data >=
ASYNC_THRESHOLD = 131072 elements). Consider adding a test with large data
(e.g., 200000 elements) to verify the async pipeline, stream synchronization,
and padding logic work correctly.
##########
qdp/qdp-core/src/gpu/encodings/amplitude.rs:
##########
@@ -57,23 +58,27 @@ impl QuantumEncoder for AmplitudeEncoder {
GpuStateVector::new(_device, num_qubits)?
};
- // Copy input data to GPU (synchronous, zero-copy from slice)
- // TODO : Use async CUDA streams for pipeline overlap
+ // SSS-Tier Optimization: Async Pipeline for large data
+ // For small data (< 1MB), use synchronous path to avoid stream
overhead
+ // For large data, use dual-stream async pipeline for maximum
throughput
+ const ASYNC_THRESHOLD: usize = 1024 * 1024 /
std::mem::size_of::<f64>(); // 1MB threshold
+
+ if host_data.len() < ASYNC_THRESHOLD {
+ // Synchronous path for small data (avoids stream overhead)
let input_slice = {
crate::profile_scope!("GPU::H2DCopy");
_device.htod_sync_copy(host_data)
.map_err(|e| MahoutError::MemoryAllocation(format!("Failed
to allocate input buffer: {:?}", e)))?
};
- // Launch CUDA kernel (CPU-side launch only; execution is
asynchronous)
let ret = {
crate::profile_scope!("GPU::KernelLaunch");
unsafe {
launch_amplitude_encode(
*input_slice.device_ptr() as *const f64,
state_vector.ptr() as *mut c_void,
- host_data.len() as i32,
- state_len as i32,
+ host_data.len(),
+ state_len,
Review Comment:
Inconsistent indentation for function arguments. Lines 80-81 have extra
indentation compared to the other arguments (lines 78-79, 82-83). All arguments
should be aligned consistently, either all at the same indentation level or
using a consistent style.
```suggestion
host_data.len(),
state_len,
```
##########
qdp/qdp-core/src/gpu/pipeline.rs:
##########
@@ -0,0 +1,174 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Async Pipeline Infrastructure
+//
+// Provides generic double-buffered execution for large data processing.
+// Separates the "streaming mechanics" from the "kernel logic".
+
+use std::sync::Arc;
+use std::ffi::c_void;
+use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
+use crate::error::{MahoutError, Result};
+
+/// Chunk processing callback for async pipeline
+///
+/// This closure is called for each chunk with:
+/// - `stream`: The CUDA stream to launch the kernel on
+/// - `input_ptr`: Device pointer to the chunk data (already copied)
+/// - `chunk_offset`: Global offset in the original data (in elements)
+/// - `chunk_len`: Length of this chunk (in elements)
+pub type ChunkProcessor = dyn FnMut(&CudaStream, *const f64, usize, usize) ->
Result<()>;
+
+/// Executes a task using dual-stream double-buffering pattern
+///
+/// This function handles the generic pipeline mechanics:
+/// - Dual stream creation and management
+/// - Data chunking and async H2D copy
+/// - Buffer lifetime management
+/// - Stream synchronization
+///
+/// The caller provides a `kernel_launcher` closure that handles the
+/// specific kernel launch logic for each chunk.
+///
+/// # Arguments
+/// * `device` - The CUDA device
+/// * `host_data` - Full source data to process
+/// * `kernel_launcher` - Closure that launches the specific kernel for each
chunk
+///
+/// # Example
+/// ```rust,ignore
+/// run_dual_stream_pipeline(device, host_data, |stream, input_ptr, offset,
len| {
+/// // Launch your specific kernel here
+/// launch_my_kernel(input_ptr, offset, len, stream)?;
+/// Ok(())
+/// })?;
+/// ```
+#[cfg(target_os = "linux")]
+pub fn run_dual_stream_pipeline<F>(
+ device: &Arc<CudaDevice>,
+ host_data: &[f64],
+ mut kernel_launcher: F,
+) -> Result<()>
+where
+ F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>,
+{
+ crate::profile_scope!("GPU::AsyncPipeline");
+
+ // 1. Create dual streams for pipeline overlap
+ let stream1 = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 1:
{:?}", e)))?;
+ let stream2 = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 2:
{:?}", e)))?;
+ let streams = [&stream1, &stream2];
+
+ // 2. Chunk size: 8MB per chunk (balance between overhead and overlap
opportunity)
+ // TODO: we should tune this dynamically based on the detected GPU model
or PCIe bandwidth in the future.
+ // Too small = launch overhead dominates, too large = less overlap
+ const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 /
std::mem::size_of::<f64>(); // 8MB
+
+ // 3. Keep temporary buffers alive until all streams complete
+ // This prevents Rust from dropping them while GPU is still using them
+ let mut keep_alive_buffers: Vec<CudaSlice<f64>> = Vec::new();
+
+ let mut global_offset = 0;
+
+ // 4. Pipeline loop: alternate between streams for maximum overlap
+ for (chunk_idx, chunk) in
host_data.chunks(CHUNK_SIZE_ELEMENTS).enumerate() {
+ let current_stream = streams[chunk_idx % 2];
+
+ crate::profile_scope!("GPU::ChunkProcess");
+
+ // Allocate temporary device buffer for this chunk
+ let input_chunk_dev = unsafe {
+ device.alloc::<f64>(chunk.len())
+ }.map_err(|e| MahoutError::MemoryAllocation(
+ format!("Failed to allocate chunk buffer: {:?}", e)
+ ))?;
+
+ // Async copy: host to device (non-blocking, on specified stream)
+ // Uses CUDA Runtime API (cudaMemcpyAsync) for true async copy
+ {
+ crate::profile_scope!("GPU::H2DCopyAsync");
+ unsafe {
+ unsafe extern "C" {
+ fn cudaMemcpyAsync(
+ dst: *mut c_void,
+ src: *const c_void,
+ count: usize,
+ kind: u32,
+ stream: *mut c_void,
+ ) -> i32;
+ }
+
+ let dst_device_ptr = *input_chunk_dev.device_ptr() as *mut
c_void;
+ let src_host_ptr = chunk.as_ptr() as *const c_void;
+ let bytes = chunk.len() * std::mem::size_of::<f64>();
+ let stream_handle = current_stream.stream as *mut c_void;
+
+ // cudaMemcpyKind: cudaMemcpyHostToDevice = 1
+ const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
+
+ let result = cudaMemcpyAsync(
+ dst_device_ptr,
+ src_host_ptr,
+ bytes,
+ CUDA_MEMCPY_HOST_TO_DEVICE,
+ stream_handle,
+ );
+
+ if result != 0 {
+ return Err(MahoutError::MemoryAllocation(
Review Comment:
Incorrect error type for CUDA memory copy failure. `cudaMemcpyAsync` failure
should use `MahoutError::Cuda` rather than `MahoutError::MemoryAllocation`,
since the failure is a CUDA operation error (e.g., invalid pointer, stream
issue) rather than an allocation failure. `MemoryAllocation` is more
appropriate for `cudaMalloc` failures.
```suggestion
return Err(MahoutError::Cuda(
```
##########
qdp/qdp-core/src/gpu/pipeline.rs:
##########
@@ -0,0 +1,174 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Async Pipeline Infrastructure
+//
+// Provides generic double-buffered execution for large data processing.
+// Separates the "streaming mechanics" from the "kernel logic".
+
+use std::sync::Arc;
+use std::ffi::c_void;
+use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
+use crate::error::{MahoutError, Result};
+
+/// Chunk processing callback for async pipeline
+///
+/// This closure is called for each chunk with:
+/// - `stream`: The CUDA stream to launch the kernel on
+/// - `input_ptr`: Device pointer to the chunk data (already copied)
+/// - `chunk_offset`: Global offset in the original data (in elements)
+/// - `chunk_len`: Length of this chunk (in elements)
+pub type ChunkProcessor = dyn FnMut(&CudaStream, *const f64, usize, usize) ->
Result<()>;
+
+/// Executes a task using dual-stream double-buffering pattern
+///
+/// This function handles the generic pipeline mechanics:
+/// - Dual stream creation and management
+/// - Data chunking and async H2D copy
+/// - Buffer lifetime management
+/// - Stream synchronization
+///
+/// The caller provides a `kernel_launcher` closure that handles the
+/// specific kernel launch logic for each chunk.
+///
+/// # Arguments
+/// * `device` - The CUDA device
+/// * `host_data` - Full source data to process
+/// * `kernel_launcher` - Closure that launches the specific kernel for each
chunk
+///
+/// # Example
+/// ```rust,ignore
+/// run_dual_stream_pipeline(device, host_data, |stream, input_ptr, offset,
len| {
+/// // Launch your specific kernel here
+/// launch_my_kernel(input_ptr, offset, len, stream)?;
+/// Ok(())
+/// })?;
+/// ```
+#[cfg(target_os = "linux")]
+pub fn run_dual_stream_pipeline<F>(
+ device: &Arc<CudaDevice>,
+ host_data: &[f64],
+ mut kernel_launcher: F,
+) -> Result<()>
+where
+ F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>,
+{
+ crate::profile_scope!("GPU::AsyncPipeline");
+
+ // 1. Create dual streams for pipeline overlap
+ let stream1 = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 1:
{:?}", e)))?;
+ let stream2 = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 2:
{:?}", e)))?;
+ let streams = [&stream1, &stream2];
+
+ // 2. Chunk size: 8MB per chunk (balance between overhead and overlap
opportunity)
+ // TODO: we should tune this dynamically based on the detected GPU model
or PCIe bandwidth in the future.
+ // Too small = launch overhead dominates, too large = less overlap
+ const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 /
std::mem::size_of::<f64>(); // 8MB
+
+ // 3. Keep temporary buffers alive until all streams complete
+ // This prevents Rust from dropping them while GPU is still using them
+ let mut keep_alive_buffers: Vec<CudaSlice<f64>> = Vec::new();
+
+ let mut global_offset = 0;
+
+ // 4. Pipeline loop: alternate between streams for maximum overlap
+ for (chunk_idx, chunk) in
host_data.chunks(CHUNK_SIZE_ELEMENTS).enumerate() {
+ let current_stream = streams[chunk_idx % 2];
+
+ crate::profile_scope!("GPU::ChunkProcess");
+
+ // Allocate temporary device buffer for this chunk
+ let input_chunk_dev = unsafe {
+ device.alloc::<f64>(chunk.len())
+ }.map_err(|e| MahoutError::MemoryAllocation(
+ format!("Failed to allocate chunk buffer: {:?}", e)
+ ))?;
+
+ // Async copy: host to device (non-blocking, on specified stream)
+ // Uses CUDA Runtime API (cudaMemcpyAsync) for true async copy
+ {
+ crate::profile_scope!("GPU::H2DCopyAsync");
+ unsafe {
+ unsafe extern "C" {
+ fn cudaMemcpyAsync(
+ dst: *mut c_void,
+ src: *const c_void,
+ count: usize,
+ kind: u32,
+ stream: *mut c_void,
+ ) -> i32;
+ }
Review Comment:
Nested `unsafe` blocks are redundant. The outer `unsafe` block on line 106
already covers the scope, so the inner `unsafe extern "C"` block on line 107
doesn't provide additional safety documentation. Consider moving the FFI
declaration to module level or outside the function to avoid nesting and
improve code clarity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]