rich7420 commented on code in PR #751:
URL: https://github.com/apache/mahout/pull/751#discussion_r2658918782
##########
qdp/qdp-core/src/gpu/pipeline.rs:
##########
@@ -191,32 +252,40 @@ where
{
crate::profile_scope!("GPU::AsyncPipeline");
- // 1. Create dual streams for pipeline overlap
- let stream1 = device
- .fork_default_stream()
- .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 1:
{:?}", e)))?;
- let stream2 = device
- .fork_default_stream()
- .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 2:
{:?}", e)))?;
- let streams = [&stream1, &stream2];
+ // Pinned host staging pool sized to the current chunking strategy
(double-buffer by default).
+ const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 /
std::mem::size_of::<f64>(); // 8MB
+ const PINNED_POOL_SIZE: usize = 2; // double buffering
+ // 1. Create dual streams with per-slot events to coordinate copy ->
compute
+ let ctx = PipelineContext::new(device, PINNED_POOL_SIZE)?;
+ let pinned_pool = PinnedBufferPool::new(PINNED_POOL_SIZE,
CHUNK_SIZE_ELEMENTS)
+ .map_err(|e| MahoutError::Cuda(format!("Failed to create pinned buffer
pool: {}", e)))?;
// 2. Chunk size: 8MB per chunk (balance between overhead and overlap
opportunity)
- // TODO: we should tune this dynamically based on the detected GPU model
or PCIe bandwidth in the future.
- // Too small = launch overhead dominates, too large = less overlap
- const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 /
std::mem::size_of::<f64>(); // 8MB
+ // TODO: tune dynamically based on GPU/PCIe bandwidth.
// 3. Keep temporary buffers alive until all streams complete
// This prevents Rust from dropping them while GPU is still using them
let mut keep_alive_buffers: Vec<CudaSlice<f64>> = Vec::new();
+ // Keep pinned buffers alive until the copy stream has completed their H2D
copy
+ let mut in_flight_pinned: Vec<PinnedBufferHandle> = Vec::new();
let mut global_offset = 0;
- // 4. Pipeline loop: alternate between streams for maximum overlap
+ // 4. Pipeline loop: copy on copy stream, compute on compute stream with
event handoff
for (chunk_idx, chunk) in
host_data.chunks(CHUNK_SIZE_ELEMENTS).enumerate() {
- let current_stream = streams[chunk_idx % 2];
+ let chunk_offset = global_offset;
+ let event_slot = chunk_idx % PINNED_POOL_SIZE;
crate::profile_scope!("GPU::ChunkProcess");
+ if chunk.len() > CHUNK_SIZE_ELEMENTS {
Review Comment:
I think this function would be never used. should be removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]