rich7420 commented on code in PR #751:
URL: https://github.com/apache/mahout/pull/751#discussion_r2648437169


##########
qdp/qdp-core/src/gpu/pipeline.rs:
##########
@@ -24,90 +24,116 @@ use std::ffi::c_void;
 use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
 use crate::error::{MahoutError, Result};
 #[cfg(target_os = "linux")]
-use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error, 
PinnedBuffer};
-
+use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+#[cfg(target_os = "linux")]
+use crate::gpu::buffer_pool::{PinnedBufferPool, PinnedBufferHandle};
 #[cfg(target_os = "linux")]
 use crate::gpu::cuda_ffi::{
-    cudaEventCreateWithFlags, cudaEventDestroy, cudaEventRecord, 
cudaMemcpyAsync, cudaStreamSynchronize,
-    cudaStreamWaitEvent, CUDA_EVENT_DISABLE_TIMING, CUDA_MEMCPY_HOST_TO_DEVICE,
+    cudaEventCreateWithFlags,
+    cudaEventDestroy,
+    cudaEventRecord,
+    cudaMemcpyAsync,
+    cudaStreamSynchronize,
+    cudaStreamWaitEvent,
+    CUDA_EVENT_DISABLE_TIMING,
+    CUDA_MEMCPY_HOST_TO_DEVICE,
 };
 
-/// Dual-stream pipeline context: manages compute/copy streams and sync events
+/// Dual-stream context coordinating copy/compute with an event.
 #[cfg(target_os = "linux")]
 pub struct PipelineContext {
     pub stream_compute: CudaStream,
     pub stream_copy: CudaStream,
-    event_copy_done: *mut c_void,
+    events_copy_done: Vec<*mut c_void>,
 }
 
 #[cfg(target_os = "linux")]
+#[allow(unsafe_op_in_unsafe_fn)]
 impl PipelineContext {
-    /// Create dual streams and sync event
-    pub fn new(device: &Arc<CudaDevice>) -> Result<Self> {
+    pub fn new(device: &Arc<CudaDevice>, event_slots: usize) -> Result<Self> {
         let stream_compute = device.fork_default_stream()
-            .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+            .map_err(|e| MahoutError::Cuda(format!("Failed to create compute 
stream: {:?}", e)))?;
         let stream_copy = device.fork_default_stream()
-            .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+            .map_err(|e| MahoutError::Cuda(format!("Failed to create copy 
stream: {:?}", e)))?;
 
-        let mut event_copy_done: *mut c_void = std::ptr::null_mut();
-        unsafe {
-            let ret = cudaEventCreateWithFlags(&mut event_copy_done, 
CUDA_EVENT_DISABLE_TIMING);
-            if ret != 0 {
-                return Err(MahoutError::Cuda(format!("Failed to create CUDA 
event: {}", ret)));
+        let mut events_copy_done = Vec::with_capacity(event_slots);
+        for _ in 0..event_slots {
+            let mut ev: *mut c_void = std::ptr::null_mut();
+            unsafe {
+                let ret = cudaEventCreateWithFlags(&mut ev, 
CUDA_EVENT_DISABLE_TIMING);
+                if ret != 0 {
+                    return Err(MahoutError::Cuda(format!("Failed to create 
CUDA event: {}", ret)));
+                }
             }
+            events_copy_done.push(ev);
         }
 
         Ok(Self {
             stream_compute,
             stream_copy,
-            event_copy_done,
+            events_copy_done,
         })
     }
 
-    /// Async H2D copy on copy stream
-    pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut 
c_void, len_elements: usize) {
+    /// Async H2D copy on the copy stream.
+    pub unsafe fn async_copy_to_device(
+        &self,
+        src: *const c_void,
+        dst: *mut c_void,
+        len_elements: usize,
+    ) -> Result<()> {
         crate::profile_scope!("GPU::H2D_Copy");
-        unsafe {
-            cudaMemcpyAsync(
-                dst,
-                src.ptr() as *const c_void,
-                len_elements * std::mem::size_of::<f64>(),
-                CUDA_MEMCPY_HOST_TO_DEVICE,
-                self.stream_copy.stream as *mut c_void
-            );
+        let ret = cudaMemcpyAsync(
+            dst,
+            src,
+            len_elements * std::mem::size_of::<f64>(),
+            CUDA_MEMCPY_HOST_TO_DEVICE,
+            self.stream_copy.stream as *mut c_void,
+        );
+        if ret != 0 {
+            return Err(MahoutError::Cuda(format!("Async H2D copy failed with 
CUDA error: {}", ret)));
         }
+        Ok(())
     }
 
-    /// Record copy completion event
-    pub unsafe fn record_copy_done(&self) {
-        unsafe {
-            cudaEventRecord(self.event_copy_done, self.stream_copy.stream as 
*mut c_void);
+    /// Record completion of the copy on the copy stream.
+    pub unsafe fn record_copy_done(&self, slot: usize) -> Result<()> {
+        let ret = cudaEventRecord(self.events_copy_done[slot], 
self.stream_copy.stream as *mut c_void);
+        if ret != 0 {
+            return Err(MahoutError::Cuda(format!("cudaEventRecord failed: {}", 
ret)));
         }
+        Ok(())
     }
 
-    /// Make compute stream wait for copy completion
-    pub unsafe fn wait_for_copy(&self) {
+    /// Make compute stream wait for the copy completion event.
+    pub unsafe fn wait_for_copy(&self, slot: usize) -> Result<()> {
         crate::profile_scope!("GPU::StreamWait");
-        unsafe {
-            cudaStreamWaitEvent(self.stream_compute.stream as *mut c_void, 
self.event_copy_done, 0);
+        let ret = cudaStreamWaitEvent(self.stream_compute.stream as *mut 
c_void, self.events_copy_done[slot], 0);

Review Comment:
   should we add a check here?
   like:
   ```
   if slot >= self.events_copy_done.len() {
           return Err(MahoutError::InvalidInput(format!(
               "Event slot {} out of range (max: {})",
               slot, self.events_copy_done.len() - 1
           )));
       }
   ```
   or something



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to