This is an automated email from the ASF dual-hosted git repository.

guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git

commit c987a43724a2ca77ca6581149739fc9045544300
Author: Ping <[email protected]>
AuthorDate: Mon Jan 5 01:48:14 2026 +0800

    [QDP] Double-buffered pinned I/O pipeline and faster Parquet decode (#751)
    
    * Double-buffered async I/O for read_parquet_batch
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update
    
    Signed-off-by: 400Ping <[email protected]>
    
    * fix python binding error
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update
    
    Signed-off-by: 400Ping <[email protected]>
    
    * fix build error
    
    Signed-off-by: 400Ping <[email protected]>
    
    * Revert "fix build error"
    
    This reverts commit 3556b5ad72418d9f31c9449b03df7682fd8018d9.
    
    * fix build errors
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update unit test and boundary check
    
    Signed-off-by: 400Ping <[email protected]>
    
    * remove improvement 2
    
    Signed-off-by: 400Ping <[email protected]>
    
    * fix qdp-core error
    
    Signed-off-by: 400Ping <[email protected]>
    
    * fix pre-commit
    
    Signed-off-by: 400Ping <[email protected]>
    
    * [Fix] fix pre-commit errors & warnings
    
    Signed-off-by: 400Ping <[email protected]>
    
    * fix rust linters
    
    Signed-off-by: 400Ping <[email protected]>
    
    * [Fix] handle buffer pool lock poisoning
    
    Signed-off-by: 400Ping <[email protected]>
    
    * [Chore] fix rust linters
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update
    
    Signed-off-by: 400Ping <[email protected]>
    
    * Remove unused func
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update
    
    Signed-off-by: 400Ping <[email protected]>
    
    ---------
    
    Signed-off-by: 400Ping <[email protected]>
---
 qdp/qdp-core/src/gpu/buffer_pool.rs       | 151 +++++++++++++++++
 qdp/qdp-core/src/gpu/memory.rs            |  21 ++-
 qdp/qdp-core/src/gpu/mod.rs               |   4 +
 qdp/qdp-core/src/gpu/pipeline.rs          | 270 ++++++++++++++++++------------
 qdp/qdp-core/src/lib.rs                   |  59 +++----
 qdp/qdp-kernels/tests/amplitude_encode.rs |   4 +-
 6 files changed, 360 insertions(+), 149 deletions(-)

diff --git a/qdp/qdp-core/src/gpu/buffer_pool.rs 
b/qdp/qdp-core/src/gpu/buffer_pool.rs
new file mode 100644
index 000000000..6604594be
--- /dev/null
+++ b/qdp/qdp-core/src/gpu/buffer_pool.rs
@@ -0,0 +1,151 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Reusable pool of pinned host buffers for staging Disk → Host → GPU 
transfers.
+//! Intended for producer/consumer pipelines that need a small, fixed set of
+//! page-locked buffers to avoid repeated cudaHostAlloc / cudaFreeHost.
+
+use std::sync::{Arc, Condvar, Mutex, MutexGuard};
+
+use crate::error::{MahoutError, Result};
+use crate::gpu::memory::PinnedHostBuffer;
+
+/// Handle that automatically returns a buffer to the pool on drop.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferHandle {
+    buffer: Option<PinnedHostBuffer>,
+    pool: Arc<PinnedBufferPool>,
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::Deref for PinnedBufferHandle {
+    type Target = PinnedHostBuffer;
+
+    fn deref(&self) -> &Self::Target {
+        self.buffer
+            .as_ref()
+            .expect("Buffer already returned to pool")
+    }
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::DerefMut for PinnedBufferHandle {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        self.buffer
+            .as_mut()
+            .expect("Buffer already returned to pool")
+    }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PinnedBufferHandle {
+    fn drop(&mut self) {
+        if let Some(buf) = self.buffer.take() {
+            let mut free = self.pool.lock_free();
+            free.push(buf);
+            self.pool.available_cv.notify_one();
+        }
+    }
+}
+
+/// Pool of pinned host buffers sized for a fixed batch shape.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferPool {
+    free: Mutex<Vec<PinnedHostBuffer>>,
+    available_cv: Condvar,
+    capacity: usize,
+    elements_per_buffer: usize,
+}
+
+#[cfg(target_os = "linux")]
+impl PinnedBufferPool {
+    /// Create a pool with `pool_size` pinned buffers, each sized for 
`elements_per_buffer` f64 values.
+    pub fn new(pool_size: usize, elements_per_buffer: usize) -> 
Result<Arc<Self>> {
+        if pool_size == 0 {
+            return Err(MahoutError::InvalidInput(
+                "PinnedBufferPool requires at least one buffer".to_string(),
+            ));
+        }
+        if elements_per_buffer == 0 {
+            return Err(MahoutError::InvalidInput(
+                "PinnedBufferPool buffer size must be greater than 
zero".to_string(),
+            ));
+        }
+
+        let mut buffers = Vec::with_capacity(pool_size);
+        for _ in 0..pool_size {
+            buffers.push(PinnedHostBuffer::new(elements_per_buffer)?);
+        }
+
+        Ok(Arc::new(Self {
+            free: Mutex::new(buffers),
+            available_cv: Condvar::new(),
+            capacity: pool_size,
+            elements_per_buffer,
+        }))
+    }
+
+    fn lock_free(&self) -> MutexGuard<'_, Vec<PinnedHostBuffer>> {
+        // Ignore poisoning to keep the pool usable after a panic elsewhere.
+        self.free
+            .lock()
+            .unwrap_or_else(|poisoned| poisoned.into_inner())
+    }
+
+    /// Acquire a pinned buffer, blocking until one is available.
+    pub fn acquire(self: &Arc<Self>) -> PinnedBufferHandle {
+        let mut free = self.lock_free();
+        loop {
+            if let Some(buffer) = free.pop() {
+                return PinnedBufferHandle {
+                    buffer: Some(buffer),
+                    pool: Arc::clone(self),
+                };
+            }
+            free = self
+                .available_cv
+                .wait(free)
+                .unwrap_or_else(|poisoned| poisoned.into_inner());
+        }
+    }
+
+    /// Try to acquire a pinned buffer from the pool.
+    ///
+    /// Returns `None` if the pool is currently empty; callers can choose to 
spin/wait
+    /// or fall back to synchronous paths.
+    pub fn try_acquire(self: &Arc<Self>) -> Option<PinnedBufferHandle> {
+        let mut free = self.lock_free();
+        free.pop().map(|buffer| PinnedBufferHandle {
+            buffer: Some(buffer),
+            pool: Arc::clone(self),
+        })
+    }
+
+    /// Number of buffers currently available.
+    pub fn available(&self) -> usize {
+        self.lock_free().len()
+    }
+
+    /// Total number of buffers managed by this pool.
+    pub fn capacity(&self) -> usize {
+        self.capacity
+    }
+
+    /// Fixed element capacity for each buffer in the pool.
+    pub fn elements_per_buffer(&self) -> usize {
+        self.elements_per_buffer
+    }
+}
diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs
index 5944c8671..07ec86583 100644
--- a/qdp/qdp-core/src/gpu/memory.rs
+++ b/qdp/qdp-core/src/gpu/memory.rs
@@ -17,6 +17,7 @@ use crate::error::{MahoutError, Result};
 use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr};
 use qdp_kernels::{CuComplex, CuDoubleComplex};
 use std::ffi::c_void;
+#[cfg(target_os = "linux")]
 use std::sync::Arc;
 
 /// Precision of the GPU state vector.
@@ -205,6 +206,7 @@ pub struct GpuStateVector {
     pub size_elements: usize,
     /// Batch size (None for single state)
     pub(crate) num_samples: Option<usize>,
+    /// CUDA device ordinal
     pub device_id: usize,
 }
 
@@ -446,17 +448,17 @@ impl GpuStateVector {
 
 // === Pinned Memory Implementation ===
 
-/// Pinned Host Memory Buffer (Page-Locked)
+/// Pinned Host Memory Buffer (owned allocation).
 ///
-/// Enables DMA for H2D copies, doubling bandwidth and reducing CPU usage.
+/// Allocates page-locked memory to maximize H2D throughput in streaming IO 
paths.
 #[cfg(target_os = "linux")]
-pub struct PinnedBuffer {
+pub struct PinnedHostBuffer {
     ptr: *mut f64,
     size_elements: usize,
 }
 
 #[cfg(target_os = "linux")]
-impl PinnedBuffer {
+impl PinnedHostBuffer {
     /// Allocate pinned memory
     pub fn new(elements: usize) -> Result<Self> {
         unsafe {
@@ -491,6 +493,11 @@ impl PinnedBuffer {
         unsafe { std::slice::from_raw_parts_mut(self.ptr, self.size_elements) }
     }
 
+    /// Immutable slice view of the pinned region
+    pub fn as_slice(&self) -> &[f64] {
+        unsafe { std::slice::from_raw_parts(self.ptr, self.size_elements) }
+    }
+
     /// Get raw pointer for CUDA memcpy
     pub fn ptr(&self) -> *const f64 {
         self.ptr
@@ -506,7 +513,7 @@ impl PinnedBuffer {
 }
 
 #[cfg(target_os = "linux")]
-impl Drop for PinnedBuffer {
+impl Drop for PinnedHostBuffer {
     fn drop(&mut self) {
         unsafe {
             let result = cudaFreeHost(self.ptr as *mut c_void);
@@ -523,7 +530,7 @@ impl Drop for PinnedBuffer {
 
 // Safety: Pinned memory is accessible from any thread
 #[cfg(target_os = "linux")]
-unsafe impl Send for PinnedBuffer {}
+unsafe impl Send for PinnedHostBuffer {}
 
 #[cfg(target_os = "linux")]
-unsafe impl Sync for PinnedBuffer {}
+unsafe impl Sync for PinnedHostBuffer {}
diff --git a/qdp/qdp-core/src/gpu/mod.rs b/qdp/qdp-core/src/gpu/mod.rs
index c42fe1afe..451da1498 100644
--- a/qdp/qdp-core/src/gpu/mod.rs
+++ b/qdp/qdp-core/src/gpu/mod.rs
@@ -14,6 +14,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#[cfg(target_os = "linux")]
+pub mod buffer_pool;
 pub mod encodings;
 pub mod memory;
 pub mod pipeline;
@@ -21,6 +23,8 @@ pub mod pipeline;
 #[cfg(target_os = "linux")]
 pub(crate) mod cuda_ffi;
 
+#[cfg(target_os = "linux")]
+pub use buffer_pool::{PinnedBufferHandle, PinnedBufferPool};
 pub use encodings::{AmplitudeEncoder, AngleEncoder, BasisEncoder, 
QuantumEncoder, get_encoder};
 pub use memory::GpuStateVector;
 pub use pipeline::run_dual_stream_pipeline;
diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs
index 8c72bf3fa..5acb7d32b 100644
--- a/qdp/qdp-core/src/gpu/pipeline.rs
+++ b/qdp/qdp-core/src/gpu/pipeline.rs
@@ -21,118 +21,177 @@
 
 use crate::error::{MahoutError, Result};
 #[cfg(target_os = "linux")]
-use crate::gpu::memory::{PinnedBuffer, ensure_device_memory_available, 
map_allocation_error};
-use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
-use std::ffi::c_void;
-use std::sync::Arc;
-
+use crate::gpu::buffer_pool::{PinnedBufferHandle, PinnedBufferPool};
 #[cfg(target_os = "linux")]
 use crate::gpu::cuda_ffi::{
     CUDA_EVENT_DISABLE_TIMING, CUDA_MEMCPY_HOST_TO_DEVICE, 
cudaEventCreateWithFlags,
     cudaEventDestroy, cudaEventRecord, cudaMemcpyAsync, cudaStreamSynchronize, 
cudaStreamWaitEvent,
 };
+#[cfg(target_os = "linux")]
+use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
+use std::ffi::c_void;
+use std::sync::Arc;
 
-/// Dual-stream pipeline context: manages compute/copy streams and sync events
+/// Dual-stream context coordinating copy/compute with an event.
 #[cfg(target_os = "linux")]
 pub struct PipelineContext {
     pub stream_compute: CudaStream,
     pub stream_copy: CudaStream,
-    event_copy_done: *mut c_void,
+    events_copy_done: Vec<*mut c_void>,
+}
+
+#[cfg(target_os = "linux")]
+fn validate_event_slot(events: &[*mut c_void], slot: usize) -> Result<()> {
+    if slot >= events.len() {
+        return Err(MahoutError::InvalidInput(format!(
+            "Event slot {} out of range (max: {})",
+            slot,
+            events.len().saturating_sub(1)
+        )));
+    }
+    Ok(())
 }
 
 #[cfg(target_os = "linux")]
+#[allow(unsafe_op_in_unsafe_fn)]
 impl PipelineContext {
-    /// Create dual streams and sync event
-    pub fn new(device: &Arc<CudaDevice>) -> Result<Self> {
+    pub fn new(device: &Arc<CudaDevice>, event_slots: usize) -> Result<Self> {
         let stream_compute = device
             .fork_default_stream()
-            .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+            .map_err(|e| MahoutError::Cuda(format!("Failed to create compute 
stream: {:?}", e)))?;
         let stream_copy = device
             .fork_default_stream()
-            .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+            .map_err(|e| MahoutError::Cuda(format!("Failed to create copy 
stream: {:?}", e)))?;
 
-        let mut event_copy_done: *mut c_void = std::ptr::null_mut();
-        unsafe {
-            let ret = cudaEventCreateWithFlags(&mut event_copy_done, 
CUDA_EVENT_DISABLE_TIMING);
-            if ret != 0 {
-                return Err(MahoutError::Cuda(format!(
-                    "Failed to create CUDA event: {}",
-                    ret
-                )));
+        let mut events_copy_done = Vec::with_capacity(event_slots);
+        for _ in 0..event_slots {
+            let mut ev: *mut c_void = std::ptr::null_mut();
+            unsafe {
+                let ret = cudaEventCreateWithFlags(&mut ev, 
CUDA_EVENT_DISABLE_TIMING);
+                if ret != 0 {
+                    return Err(MahoutError::Cuda(format!(
+                        "Failed to create CUDA event: {}",
+                        ret
+                    )));
+                }
             }
+            events_copy_done.push(ev);
         }
 
         Ok(Self {
             stream_compute,
             stream_copy,
-            event_copy_done,
+            events_copy_done,
         })
     }
 
-    /// Async H2D copy on copy stream
+    /// Async H2D copy on the copy stream.
     ///
     /// # Safety
-    ///
-    /// The caller must ensure that:
-    /// - `dst` points to valid device memory of at least `len_elements * 
sizeof(f64)` bytes
-    /// - `src` is a valid pinned buffer with at least `len_elements` elements
-    /// - The memory regions do not overlap in an undefined way
-    /// - The CUDA stream is valid and properly initialized
+    /// `src` must be valid for `len_elements` `f64` values and properly 
aligned.
+    /// `dst` must point to device memory for `len_elements` `f64` values on 
the same device.
+    /// Both pointers must remain valid until the copy completes on 
`stream_copy`.
     pub unsafe fn async_copy_to_device(
         &self,
-        src: &PinnedBuffer,
+        src: *const c_void,
         dst: *mut c_void,
         len_elements: usize,
-    ) {
+    ) -> Result<()> {
         crate::profile_scope!("GPU::H2D_Copy");
-        unsafe {
-            cudaMemcpyAsync(
-                dst,
-                src.ptr() as *const c_void,
-                len_elements * std::mem::size_of::<f64>(),
-                CUDA_MEMCPY_HOST_TO_DEVICE,
-                self.stream_copy.stream as *mut c_void,
-            );
+        let ret = cudaMemcpyAsync(
+            dst,
+            src,
+            len_elements * std::mem::size_of::<f64>(),
+            CUDA_MEMCPY_HOST_TO_DEVICE,
+            self.stream_copy.stream as *mut c_void,
+        );
+        if ret != 0 {
+            return Err(MahoutError::Cuda(format!(
+                "Async H2D copy failed with CUDA error: {}",
+                ret
+            )));
         }
+        Ok(())
     }
 
-    /// Record copy completion event
+    /// Record completion of the copy on the copy stream.
     ///
     /// # Safety
-    ///
-    /// The caller must ensure that the CUDA event and stream are valid and 
properly initialized.
-    pub unsafe fn record_copy_done(&self) {
-        unsafe {
-            cudaEventRecord(self.event_copy_done, self.stream_copy.stream as 
*mut c_void);
+    /// `slot` must refer to a live event created by this context, and the 
context must
+    /// remain alive until the event is no longer used by any stream.
+    pub unsafe fn record_copy_done(&self, slot: usize) -> Result<()> {
+        validate_event_slot(&self.events_copy_done, slot)?;
+
+        let ret = cudaEventRecord(
+            self.events_copy_done[slot],
+            self.stream_copy.stream as *mut c_void,
+        );
+        if ret != 0 {
+            return Err(MahoutError::Cuda(format!(
+                "cudaEventRecord failed: {}",
+                ret
+            )));
         }
+        Ok(())
     }
 
-    /// Make compute stream wait for copy completion
+    /// Make compute stream wait for the copy completion event.
     ///
     /// # Safety
-    ///
-    /// The caller must ensure that the compute stream and copy event are 
valid and properly initialized.
-    pub unsafe fn wait_for_copy(&self) {
+    /// `slot` must refer to a live event previously recorded on 
`stream_copy`, and the
+    /// context and its streams must remain valid while waiting.
+    pub unsafe fn wait_for_copy(&self, slot: usize) -> Result<()> {
         crate::profile_scope!("GPU::StreamWait");
-        unsafe {
-            cudaStreamWaitEvent(
-                self.stream_compute.stream as *mut c_void,
-                self.event_copy_done,
-                0,
-            );
+        validate_event_slot(&self.events_copy_done, slot)?;
+
+        let ret = cudaStreamWaitEvent(
+            self.stream_compute.stream as *mut c_void,
+            self.events_copy_done[slot],
+            0,
+        );
+        if ret != 0 {
+            return Err(MahoutError::Cuda(format!(
+                "cudaStreamWaitEvent failed: {}",
+                ret
+            )));
         }
+        Ok(())
     }
 
-    /// Sync copy stream (safe to reuse host buffer)
+    /// Sync copy stream (safe to reuse host buffer).
     ///
     /// # Safety
-    ///
-    /// The caller must ensure that the copy stream is valid and properly 
initialized.
-    pub unsafe fn sync_copy_stream(&self) {
+    /// The context and its copy stream must be valid and not destroyed while 
syncing.
+    pub unsafe fn sync_copy_stream(&self) -> Result<()> {
         crate::profile_scope!("Pipeline::SyncCopy");
-        unsafe {
-            cudaStreamSynchronize(self.stream_copy.stream as *mut c_void);
+        let ret = cudaStreamSynchronize(self.stream_copy.stream as *mut 
c_void);
+        if ret != 0 {
+            return Err(MahoutError::Cuda(format!(
+                "cudaStreamSynchronize(copy) failed: {}",
+                ret
+            )));
         }
+        Ok(())
+    }
+}
+
+#[cfg(all(test, target_os = "linux"))]
+mod tests {
+    use super::validate_event_slot;
+
+    #[test]
+    fn validate_event_slot_allows_in_range() {
+        let events = vec![std::ptr::null_mut(); 2];
+        assert!(validate_event_slot(&events, 0).is_ok());
+        assert!(validate_event_slot(&events, 1).is_ok());
+    }
+
+    #[test]
+    fn validate_event_slot_rejects_out_of_range() {
+        let events = vec![std::ptr::null_mut(); 2];
+        let err = validate_event_slot(&events, 2).unwrap_err();
+        assert!(matches!(err, crate::error::MahoutError::InvalidInput(_)));
     }
 }
 
@@ -140,22 +199,15 @@ impl PipelineContext {
 impl Drop for PipelineContext {
     fn drop(&mut self) {
         unsafe {
-            if !self.event_copy_done.is_null() {
-                cudaEventDestroy(self.event_copy_done);
+            for ev in &mut self.events_copy_done {
+                if !ev.is_null() {
+                    let _ = cudaEventDestroy(*ev);
+                }
             }
         }
     }
 }
 
-/// Chunk processing callback for async pipeline
-///
-/// This closure is called for each chunk with:
-/// - `stream`: The CUDA stream to launch the kernel on
-/// - `input_ptr`: Device pointer to the chunk data (already copied)
-/// - `chunk_offset`: Global offset in the original data (in elements)
-/// - `chunk_len`: Length of this chunk (in elements)
-pub type ChunkProcessor = dyn FnMut(&CudaStream, *const f64, usize, usize) -> 
Result<()>;
-
 /// Executes a task using dual-stream double-buffering pattern
 ///
 /// This function handles the generic pipeline mechanics:
@@ -191,29 +243,29 @@ where
 {
     crate::profile_scope!("GPU::AsyncPipeline");
 
-    // 1. Create dual streams for pipeline overlap
-    let stream1 = device
-        .fork_default_stream()
-        .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 1: 
{:?}", e)))?;
-    let stream2 = device
-        .fork_default_stream()
-        .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 2: 
{:?}", e)))?;
-    let streams = [&stream1, &stream2];
+    // Pinned host staging pool sized to the current chunking strategy 
(double-buffer by default).
+    const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 / 
std::mem::size_of::<f64>(); // 8MB
+    const PINNED_POOL_SIZE: usize = 2; // double buffering
+    // 1. Create dual streams with per-slot events to coordinate copy -> 
compute
+    let ctx = PipelineContext::new(device, PINNED_POOL_SIZE)?;
+    let pinned_pool = PinnedBufferPool::new(PINNED_POOL_SIZE, 
CHUNK_SIZE_ELEMENTS)
+        .map_err(|e| MahoutError::Cuda(format!("Failed to create pinned buffer 
pool: {}", e)))?;
 
     // 2. Chunk size: 8MB per chunk (balance between overhead and overlap 
opportunity)
-    // TODO: we should tune this dynamically based on the detected GPU model 
or PCIe bandwidth in the future.
-    // Too small = launch overhead dominates, too large = less overlap
-    const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 / 
std::mem::size_of::<f64>(); // 8MB
+    // TODO: tune dynamically based on GPU/PCIe bandwidth.
 
     // 3. Keep temporary buffers alive until all streams complete
     // This prevents Rust from dropping them while GPU is still using them
     let mut keep_alive_buffers: Vec<CudaSlice<f64>> = Vec::new();
+    // Keep pinned buffers alive until the copy stream has completed their H2D 
copy
+    let mut in_flight_pinned: Vec<PinnedBufferHandle> = Vec::new();
 
     let mut global_offset = 0;
 
-    // 4. Pipeline loop: alternate between streams for maximum overlap
+    // 4. Pipeline loop: copy on copy stream, compute on compute stream with 
event handoff
     for (chunk_idx, chunk) in 
host_data.chunks(CHUNK_SIZE_ELEMENTS).enumerate() {
-        let current_stream = streams[chunk_idx % 2];
+        let chunk_offset = global_offset;
+        let event_slot = chunk_idx % PINNED_POOL_SIZE;
 
         crate::profile_scope!("GPU::ChunkProcess");
 
@@ -225,31 +277,33 @@ where
             map_allocation_error(chunk_bytes, "pipeline chunk buffer 
allocation", None, e)
         })?;
 
+        // Acquire pinned staging buffer and populate it with the current chunk
+        let mut pinned_buf = pinned_pool.acquire();
+        pinned_buf.as_slice_mut()[..chunk.len()].copy_from_slice(chunk);
+
         // Async copy: host to device (non-blocking, on specified stream)
         // Uses CUDA Runtime API (cudaMemcpyAsync) for true async copy
         {
             crate::profile_scope!("GPU::H2DCopyAsync");
             unsafe {
-                let dst_device_ptr = *input_chunk_dev.device_ptr() as *mut 
c_void;
-                let src_host_ptr = chunk.as_ptr() as *const c_void;
-                let bytes = std::mem::size_of_val(chunk);
-                let stream_handle = current_stream.stream as *mut c_void;
-
-                let result = cudaMemcpyAsync(
-                    dst_device_ptr,
-                    src_host_ptr,
-                    bytes,
-                    CUDA_MEMCPY_HOST_TO_DEVICE,
-                    stream_handle,
-                );
-
-                if result != 0 {
-                    return Err(MahoutError::Cuda(format!(
-                        "Async H2D copy failed with CUDA error: {}",
-                        result
-                    )));
-                }
+                ctx.async_copy_to_device(
+                    pinned_buf.ptr() as *const c_void,
+                    *input_chunk_dev.device_ptr() as *mut c_void,
+                    chunk.len(),
+                )?;
+                ctx.record_copy_done(event_slot)?;
+                ctx.wait_for_copy(event_slot)?;
+            }
+        }
+
+        // Keep pinned buffer alive until the copy stream is synchronized.
+        in_flight_pinned.push(pinned_buf);
+        if in_flight_pinned.len() == PINNED_POOL_SIZE {
+            // Ensure previous H2D copies are done before reusing buffers.
+            unsafe {
+                ctx.sync_copy_stream()?;
             }
+            in_flight_pinned.clear();
         }
 
         // Get device pointer for kernel launch
@@ -258,7 +312,7 @@ where
         // Invoke caller's kernel launcher (non-blocking)
         {
             crate::profile_scope!("GPU::KernelLaunchAsync");
-            kernel_launcher(current_stream, input_ptr, global_offset, 
chunk.len())?;
+            kernel_launcher(&ctx.stream_compute, input_ptr, chunk_offset, 
chunk.len())?;
         }
 
         // Keep buffer alive until synchronization
@@ -274,12 +328,12 @@ where
     // This ensures all async copies and kernel launches have finished
     {
         crate::profile_scope!("GPU::StreamSync");
+        unsafe {
+            ctx.sync_copy_stream()?;
+        }
         device
-            .wait_for(&stream1)
-            .map_err(|e| MahoutError::Cuda(format!("Stream 1 sync failed: 
{:?}", e)))?;
-        device
-            .wait_for(&stream2)
-            .map_err(|e| MahoutError::Cuda(format!("Stream 2 sync failed: 
{:?}", e)))?;
+            .wait_for(&ctx.stream_compute)
+            .map_err(|e| MahoutError::Cuda(format!("Compute stream sync 
failed: {:?}", e)))?;
     }
 
     // Buffers are dropped here (after sync), freeing GPU memory
diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs
index b8ff42fc6..c1030b809 100644
--- a/qdp/qdp-core/src/lib.rs
+++ b/qdp/qdp-core/src/lib.rs
@@ -35,17 +35,12 @@ use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
 #[cfg(target_os = "linux")]
 use std::thread;
 
-#[cfg(target_os = "linux")]
-type BufferResult = std::result::Result<(PinnedBuffer, usize), MahoutError>;
-#[cfg(target_os = "linux")]
-type BufferChannels = (SyncSender<BufferResult>, Receiver<BufferResult>);
-
 use crate::dlpack::DLManagedTensor;
 #[cfg(target_os = "linux")]
 use crate::gpu::PipelineContext;
 use crate::gpu::get_encoder;
 #[cfg(target_os = "linux")]
-use crate::gpu::memory::{GpuStateVector, PinnedBuffer};
+use crate::gpu::memory::{GpuStateVector, PinnedHostBuffer};
 #[cfg(target_os = "linux")]
 use crate::reader::StreamingDataReader;
 use cudarc::driver::{CudaDevice, DevicePtr, DevicePtrMut};
@@ -57,6 +52,10 @@ use qdp_kernels::{launch_amplitude_encode_batch, 
launch_l2_norm_batch};
 const STAGE_SIZE_BYTES: usize = 512 * 1024 * 1024;
 #[cfg(target_os = "linux")]
 const STAGE_SIZE_ELEMENTS: usize = STAGE_SIZE_BYTES / 
std::mem::size_of::<f64>();
+#[cfg(target_os = "linux")]
+type FullBufferResult = std::result::Result<(PinnedHostBuffer, usize), 
MahoutError>;
+#[cfg(target_os = "linux")]
+type FullBufferChannel = (SyncSender<FullBufferResult>, 
Receiver<FullBufferResult>);
 
 /// Main entry point for Mahout QDP
 ///
@@ -100,7 +99,7 @@ impl QdpEngine {
     /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis"
     ///
     /// # Returns
-    /// DLPack pointer for zero-copy PyTorch integration (shape: [1, 
2^num_qubits])
+    /// DLPack pointer for zero-copy PyTorch integration
     ///
     /// # Safety
     /// Pointer freed by DLPack deleter, do not free manually.
@@ -198,18 +197,21 @@ impl QdpEngine {
 
             let total_state_vector =
                 GpuStateVector::new_batch(&self.device, num_samples, 
num_qubits)?;
-            let ctx = PipelineContext::new(&self.device)?;
+            const PIPELINE_EVENT_SLOTS: usize = 2; // matches double-buffered 
staging buffers
+            let ctx = PipelineContext::new(&self.device, 
PIPELINE_EVENT_SLOTS)?;
 
             let dev_in_a = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
                 .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
             let dev_in_b = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
                 .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
 
-            let (full_buf_tx, full_buf_rx): BufferChannels = sync_channel(2);
-            let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, 
Receiver<PinnedBuffer>) =
-                sync_channel(2);
+            let (full_buf_tx, full_buf_rx): FullBufferChannel = 
sync_channel(2);
+            let (empty_buf_tx, empty_buf_rx): (
+                SyncSender<PinnedHostBuffer>,
+                Receiver<PinnedHostBuffer>,
+            ) = sync_channel(2);
 
-            let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+            let mut host_buf_first = 
PinnedHostBuffer::new(STAGE_SIZE_ELEMENTS)?;
             let first_len = 
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
 
             let sample_size = reader_core.get_sample_size().ok_or_else(|| {
@@ -223,23 +225,15 @@ impl QdpEngine {
             }
             if sample_size > STAGE_SIZE_ELEMENTS {
                 return Err(MahoutError::InvalidInput(format!(
-                    "Sample size {} exceeds staging buffer capacity {} 
(elements)",
+                    "Sample size {} exceeds staging buffer capacity {}",
                     sample_size, STAGE_SIZE_ELEMENTS
                 )));
             }
 
-            // Reuse a single norm buffer across chunks to avoid per-chunk 
allocations.
-            //
-            // Important: the norm buffer must outlive the async kernels that 
consume it.
-            // Per-chunk allocation + drop can lead to use-after-free when the 
next chunk
-            // reuses the same device memory while the previous chunk is still 
running.
-            let max_samples_per_chunk = std::cmp::max(
-                1,
-                std::cmp::min(num_samples, STAGE_SIZE_ELEMENTS / sample_size),
-            );
+            let max_samples_in_chunk = STAGE_SIZE_ELEMENTS / sample_size;
             let mut norm_buffer = self
                 .device
-                .alloc_zeros::<f64>(max_samples_per_chunk)
+                .alloc_zeros::<f64>(max_samples_in_chunk)
                 .map_err(|e| {
                     MahoutError::MemoryAllocation(format!(
                         "Failed to allocate norm buffer: {:?}",
@@ -252,7 +246,7 @@ impl QdpEngine {
                 .map_err(|_| MahoutError::Io("Failed to send first 
buffer".into()))?;
 
             empty_buf_tx
-                .send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+                .send(PinnedHostBuffer::new(STAGE_SIZE_ELEMENTS)?)
                 .map_err(|_| MahoutError::Io("Failed to send second 
buffer".into()))?;
 
             let mut reader = reader_core;
@@ -306,6 +300,7 @@ impl QdpEngine {
 
                 let samples_in_chunk = current_len / sample_size;
                 if samples_in_chunk > 0 {
+                    let event_slot = if use_dev_a { 0 } else { 1 };
                     let dev_ptr = if use_dev_a {
                         *dev_in_a.device_ptr()
                     } else {
@@ -315,9 +310,13 @@ impl QdpEngine {
                     unsafe {
                         crate::profile_scope!("GPU::Dispatch");
 
-                        ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut 
c_void, current_len);
-                        ctx.record_copy_done();
-                        ctx.wait_for_copy();
+                        ctx.async_copy_to_device(
+                            host_buffer.ptr() as *const c_void,
+                            dev_ptr as *mut c_void,
+                            current_len,
+                        )?;
+                        ctx.record_copy_done(event_slot)?;
+                        ctx.wait_for_copy(event_slot)?;
 
                         {
                             crate::profile_scope!("GPU::BatchEncode");
@@ -345,10 +344,6 @@ impl QdpEngine {
                                 .cast::<u8>()
                                 .add(offset_bytes)
                                 .cast::<std::ffi::c_void>();
-                            debug_assert!(
-                                samples_in_chunk <= max_samples_per_chunk,
-                                "samples_in_chunk must be <= 
max_samples_per_chunk"
-                            );
 
                             {
                                 crate::profile_scope!("GPU::NormBatch");
@@ -387,7 +382,7 @@ impl QdpEngine {
                             }
                         }
 
-                        ctx.sync_copy_stream();
+                        ctx.sync_copy_stream()?;
                     }
                     global_sample_offset = global_sample_offset
                         .checked_add(samples_in_chunk)
diff --git a/qdp/qdp-kernels/tests/amplitude_encode.rs 
b/qdp/qdp-kernels/tests/amplitude_encode.rs
index f86e00fcb..0d69ca9ee 100644
--- a/qdp/qdp-kernels/tests/amplitude_encode.rs
+++ b/qdp/qdp-kernels/tests/amplitude_encode.rs
@@ -509,9 +509,9 @@ fn test_amplitude_encode_small_input_large_state() {
     assert!((state_h[1].x - 0.8).abs() < EPSILON);
 
     // Rest should be zero
-    for (i, item) in state_h.iter().enumerate().take(state_len).skip(2) {
+    for (i, value) in state_h.iter().enumerate().skip(2) {
         assert!(
-            item.x.abs() < EPSILON && item.y.abs() < EPSILON,
+            value.x.abs() < EPSILON && value.y.abs() < EPSILON,
             "Element {} should be zero-padded",
             i
         );


Reply via email to