Copilot commented on code in PR #751:
URL: https://github.com/apache/mahout/pull/751#discussion_r2656461871
##########
qdp/qdp-core/src/gpu/pipeline.rs:
##########
@@ -171,32 +252,40 @@ where
{
crate::profile_scope!("GPU::AsyncPipeline");
- // 1. Create dual streams for pipeline overlap
- let stream1 = device
- .fork_default_stream()
- .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 1:
{:?}", e)))?;
- let stream2 = device
- .fork_default_stream()
- .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 2:
{:?}", e)))?;
- let streams = [&stream1, &stream2];
+ // Pinned host staging pool sized to the current chunking strategy
(double-buffer by default).
+ const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 /
std::mem::size_of::<f64>(); // 8MB
+ const PINNED_POOL_SIZE: usize = 2; // double buffering
+ // 1. Create dual streams with per-slot events to coordinate copy ->
compute
+ let ctx = PipelineContext::new(device, PINNED_POOL_SIZE)?;
+ let pinned_pool = PinnedBufferPool::new(PINNED_POOL_SIZE,
CHUNK_SIZE_ELEMENTS)
+ .map_err(|e| MahoutError::Cuda(format!("Failed to create pinned buffer
pool: {}", e)))?;
// 2. Chunk size: 8MB per chunk (balance between overhead and overlap
opportunity)
- // TODO: we should tune this dynamically based on the detected GPU model
or PCIe bandwidth in the future.
- // Too small = launch overhead dominates, too large = less overlap
- const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 /
std::mem::size_of::<f64>(); // 8MB
+ // TODO: tune dynamically based on GPU/PCIe bandwidth.
// 3. Keep temporary buffers alive until all streams complete
// This prevents Rust from dropping them while GPU is still using them
let mut keep_alive_buffers: Vec<CudaSlice<f64>> = Vec::new();
+ // Keep pinned buffers alive until the copy stream has completed their H2D
copy
+ let mut in_flight_pinned: Vec<PinnedBufferHandle> = Vec::new();
let mut global_offset = 0;
- // 4. Pipeline loop: alternate between streams for maximum overlap
+ // 4. Pipeline loop: copy on copy stream, compute on compute stream with
event handoff
for (chunk_idx, chunk) in
host_data.chunks(CHUNK_SIZE_ELEMENTS).enumerate() {
- let current_stream = streams[chunk_idx % 2];
+ let chunk_offset = global_offset;
+ let event_slot = chunk_idx % PINNED_POOL_SIZE;
crate::profile_scope!("GPU::ChunkProcess");
+ if chunk.len() > CHUNK_SIZE_ELEMENTS {
+ return Err(MahoutError::InvalidInput(format!(
+ "Chunk size {} exceeds pinned buffer capacity {}",
+ chunk.len(),
+ CHUNK_SIZE_ELEMENTS
+ )));
+ }
+
Review Comment:
This check is redundant because the iterator `chunks()` will never produce a
chunk larger than `CHUNK_SIZE_ELEMENTS`. The check can be removed to simplify
the code.
```suggestion
```
##########
qdp/qdp-core/src/gpu/buffer_pool.rs:
##########
@@ -0,0 +1,141 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Reusable pool of pinned host buffers for staging Disk → Host → GPU
transfers.
+//! Intended for producer/consumer pipelines that need a small, fixed set of
+//! page-locked buffers to avoid repeated cudaHostAlloc / cudaFreeHost.
+
+use std::sync::{Arc, Condvar, Mutex};
+
+use crate::error::{MahoutError, Result};
+use crate::gpu::memory::PinnedHostBuffer;
+
+/// Handle that automatically returns a buffer to the pool on drop.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferHandle {
+ buffer: Option<PinnedHostBuffer>,
+ pool: Arc<PinnedBufferPool>,
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::Deref for PinnedBufferHandle {
+ type Target = PinnedHostBuffer;
+
+ fn deref(&self) -> &Self::Target {
+ self.buffer
+ .as_ref()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::DerefMut for PinnedBufferHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ self.buffer
+ .as_mut()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PinnedBufferHandle {
+ fn drop(&mut self) {
+ if let Some(buf) = self.buffer.take() {
+ let mut free = self.pool.free.lock().unwrap();
+ free.push(buf);
+ self.pool.available_cv.notify_one();
+ }
+ }
+}
+
+/// Pool of pinned host buffers sized for a fixed batch shape.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferPool {
+ free: Mutex<Vec<PinnedHostBuffer>>,
+ available_cv: Condvar,
+ capacity: usize,
+ elements_per_buffer: usize,
+}
+
+#[cfg(target_os = "linux")]
+impl PinnedBufferPool {
+ /// Create a pool with `pool_size` pinned buffers, each sized for
`elements_per_buffer` f64 values.
+ pub fn new(pool_size: usize, elements_per_buffer: usize) ->
Result<Arc<Self>> {
+ if pool_size == 0 {
+ return Err(MahoutError::InvalidInput(
+ "PinnedBufferPool requires at least one buffer".to_string(),
+ ));
+ }
+ if elements_per_buffer == 0 {
+ return Err(MahoutError::InvalidInput(
+ "PinnedBufferPool buffer size must be greater than
zero".to_string(),
+ ));
+ }
+
+ let mut buffers = Vec::with_capacity(pool_size);
+ for _ in 0..pool_size {
+ buffers.push(PinnedHostBuffer::new(elements_per_buffer)?);
+ }
+
+ Ok(Arc::new(Self {
+ free: Mutex::new(buffers),
+ available_cv: Condvar::new(),
+ capacity: pool_size,
+ elements_per_buffer,
+ }))
+ }
+
+ /// Acquire a pinned buffer, blocking until one is available.
+ pub fn acquire(self: &Arc<Self>) -> PinnedBufferHandle {
+ let mut free = self.free.lock().unwrap();
+ loop {
+ if let Some(buffer) = free.pop() {
+ return PinnedBufferHandle {
+ buffer: Some(buffer),
+ pool: Arc::clone(self),
+ };
+ }
+ free = self.available_cv.wait(free).unwrap();
+ }
+ }
+
+ /// Try to acquire a pinned buffer from the pool.
+ ///
+ /// Returns `None` if the pool is currently empty; callers can choose to
spin/wait
+ /// or fall back to synchronous paths.
+ pub fn try_acquire(self: &Arc<Self>) -> Option<PinnedBufferHandle> {
+ let mut free = self.free.lock().unwrap();
+ free.pop().map(|buffer| PinnedBufferHandle {
+ buffer: Some(buffer),
+ pool: Arc::clone(self),
+ })
+ }
+
+ /// Number of buffers currently available.
+ pub fn available(&self) -> usize {
+ self.free.lock().unwrap().len()
Review Comment:
The `.unwrap()` call on the mutex lock operation can cause a panic if the
mutex is poisoned. Consider handling this error more gracefully or documenting
the panic behavior.
##########
qdp/qdp-core/src/gpu/buffer_pool.rs:
##########
@@ -0,0 +1,141 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Reusable pool of pinned host buffers for staging Disk → Host → GPU
transfers.
+//! Intended for producer/consumer pipelines that need a small, fixed set of
+//! page-locked buffers to avoid repeated cudaHostAlloc / cudaFreeHost.
+
+use std::sync::{Arc, Condvar, Mutex};
+
+use crate::error::{MahoutError, Result};
+use crate::gpu::memory::PinnedHostBuffer;
+
+/// Handle that automatically returns a buffer to the pool on drop.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferHandle {
+ buffer: Option<PinnedHostBuffer>,
+ pool: Arc<PinnedBufferPool>,
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::Deref for PinnedBufferHandle {
+ type Target = PinnedHostBuffer;
+
+ fn deref(&self) -> &Self::Target {
+ self.buffer
+ .as_ref()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::DerefMut for PinnedBufferHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ self.buffer
+ .as_mut()
+ .expect("Buffer already returned to pool")
Review Comment:
The panic message "Buffer already returned to pool" may be misleading. This
panic occurs when attempting to use a `PinnedBufferHandle` after it has been
dropped and its buffer returned to the pool. Consider a more descriptive
message such as "Attempted to use PinnedBufferHandle after buffer was returned
to pool (use-after-drop)" to better indicate the programmer error.
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -343,10 +327,16 @@ impl QdpEngine {
.cast::<u8>()
.add(offset_bytes)
.cast::<std::ffi::c_void>();
- debug_assert!(
- samples_in_chunk <= max_samples_per_chunk,
- "samples_in_chunk must be <=
max_samples_per_chunk"
- );
+
+ let mut norm_buffer = self
+ .device
+ .alloc_zeros::<f64>(samples_in_chunk)
+ .map_err(|e| {
+ MahoutError::MemoryAllocation(format!(
+ "Failed to allocate norm buffer: {:?}",
+ e
+ ))
+ })?;
Review Comment:
Moving the norm buffer allocation inside the per-chunk loop (line 331-339)
reintroduces per-chunk allocation overhead that was previously avoided. The old
code comment explicitly stated: "Reuse a single norm buffer across chunks to
avoid per-chunk allocations" and warned that "per-chunk allocation + drop can
lead to use-after-free when the next chunk reuses the same device memory while
the previous chunk is still running." While the use-after-free concern is
mitigated by proper stream synchronization, the per-chunk allocation overhead
remains. Consider pre-allocating a single norm buffer sized for the maximum
expected samples_in_chunk to improve performance.
##########
qdp/qdp-core/src/gpu/buffer_pool.rs:
##########
@@ -0,0 +1,141 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Reusable pool of pinned host buffers for staging Disk → Host → GPU
transfers.
+//! Intended for producer/consumer pipelines that need a small, fixed set of
+//! page-locked buffers to avoid repeated cudaHostAlloc / cudaFreeHost.
+
+use std::sync::{Arc, Condvar, Mutex};
+
+use crate::error::{MahoutError, Result};
+use crate::gpu::memory::PinnedHostBuffer;
+
+/// Handle that automatically returns a buffer to the pool on drop.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferHandle {
+ buffer: Option<PinnedHostBuffer>,
+ pool: Arc<PinnedBufferPool>,
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::Deref for PinnedBufferHandle {
+ type Target = PinnedHostBuffer;
+
+ fn deref(&self) -> &Self::Target {
+ self.buffer
+ .as_ref()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::DerefMut for PinnedBufferHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ self.buffer
+ .as_mut()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PinnedBufferHandle {
+ fn drop(&mut self) {
+ if let Some(buf) = self.buffer.take() {
+ let mut free = self.pool.free.lock().unwrap();
+ free.push(buf);
+ self.pool.available_cv.notify_one();
+ }
+ }
+}
+
+/// Pool of pinned host buffers sized for a fixed batch shape.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferPool {
+ free: Mutex<Vec<PinnedHostBuffer>>,
+ available_cv: Condvar,
+ capacity: usize,
+ elements_per_buffer: usize,
+}
+
+#[cfg(target_os = "linux")]
+impl PinnedBufferPool {
+ /// Create a pool with `pool_size` pinned buffers, each sized for
`elements_per_buffer` f64 values.
+ pub fn new(pool_size: usize, elements_per_buffer: usize) ->
Result<Arc<Self>> {
+ if pool_size == 0 {
+ return Err(MahoutError::InvalidInput(
+ "PinnedBufferPool requires at least one buffer".to_string(),
+ ));
+ }
+ if elements_per_buffer == 0 {
+ return Err(MahoutError::InvalidInput(
+ "PinnedBufferPool buffer size must be greater than
zero".to_string(),
+ ));
+ }
+
+ let mut buffers = Vec::with_capacity(pool_size);
+ for _ in 0..pool_size {
+ buffers.push(PinnedHostBuffer::new(elements_per_buffer)?);
+ }
+
+ Ok(Arc::new(Self {
+ free: Mutex::new(buffers),
+ available_cv: Condvar::new(),
+ capacity: pool_size,
+ elements_per_buffer,
+ }))
+ }
+
+ /// Acquire a pinned buffer, blocking until one is available.
+ pub fn acquire(self: &Arc<Self>) -> PinnedBufferHandle {
+ let mut free = self.free.lock().unwrap();
+ loop {
+ if let Some(buffer) = free.pop() {
+ return PinnedBufferHandle {
+ buffer: Some(buffer),
+ pool: Arc::clone(self),
+ };
+ }
+ free = self.available_cv.wait(free).unwrap();
+ }
+ }
+
+ /// Try to acquire a pinned buffer from the pool.
+ ///
+ /// Returns `None` if the pool is currently empty; callers can choose to
spin/wait
+ /// or fall back to synchronous paths.
+ pub fn try_acquire(self: &Arc<Self>) -> Option<PinnedBufferHandle> {
+ let mut free = self.free.lock().unwrap();
Review Comment:
The `.unwrap()` call on the mutex lock operation can cause a panic if the
mutex is poisoned. Consider handling this error more gracefully or documenting
the panic behavior.
##########
qdp/qdp-core/src/gpu/buffer_pool.rs:
##########
@@ -0,0 +1,141 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Reusable pool of pinned host buffers for staging Disk → Host → GPU
transfers.
+//! Intended for producer/consumer pipelines that need a small, fixed set of
+//! page-locked buffers to avoid repeated cudaHostAlloc / cudaFreeHost.
+
+use std::sync::{Arc, Condvar, Mutex};
+
+use crate::error::{MahoutError, Result};
+use crate::gpu::memory::PinnedHostBuffer;
+
+/// Handle that automatically returns a buffer to the pool on drop.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferHandle {
+ buffer: Option<PinnedHostBuffer>,
+ pool: Arc<PinnedBufferPool>,
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::Deref for PinnedBufferHandle {
+ type Target = PinnedHostBuffer;
+
+ fn deref(&self) -> &Self::Target {
+ self.buffer
+ .as_ref()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::DerefMut for PinnedBufferHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ self.buffer
+ .as_mut()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PinnedBufferHandle {
+ fn drop(&mut self) {
+ if let Some(buf) = self.buffer.take() {
+ let mut free = self.pool.free.lock().unwrap();
Review Comment:
The `.unwrap()` call on the mutex lock operation can cause a panic if the
mutex is poisoned. Consider handling this error more gracefully or documenting
the panic behavior.
```suggestion
let mut free = match self.pool.free.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
```
##########
qdp/qdp-core/src/gpu/buffer_pool.rs:
##########
@@ -0,0 +1,141 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Reusable pool of pinned host buffers for staging Disk → Host → GPU
transfers.
+//! Intended for producer/consumer pipelines that need a small, fixed set of
+//! page-locked buffers to avoid repeated cudaHostAlloc / cudaFreeHost.
+
+use std::sync::{Arc, Condvar, Mutex};
+
+use crate::error::{MahoutError, Result};
+use crate::gpu::memory::PinnedHostBuffer;
+
+/// Handle that automatically returns a buffer to the pool on drop.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferHandle {
+ buffer: Option<PinnedHostBuffer>,
+ pool: Arc<PinnedBufferPool>,
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::Deref for PinnedBufferHandle {
+ type Target = PinnedHostBuffer;
+
+ fn deref(&self) -> &Self::Target {
+ self.buffer
+ .as_ref()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::DerefMut for PinnedBufferHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ self.buffer
+ .as_mut()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PinnedBufferHandle {
+ fn drop(&mut self) {
+ if let Some(buf) = self.buffer.take() {
+ let mut free = self.pool.free.lock().unwrap();
+ free.push(buf);
+ self.pool.available_cv.notify_one();
+ }
+ }
+}
+
+/// Pool of pinned host buffers sized for a fixed batch shape.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferPool {
+ free: Mutex<Vec<PinnedHostBuffer>>,
+ available_cv: Condvar,
+ capacity: usize,
+ elements_per_buffer: usize,
+}
+
+#[cfg(target_os = "linux")]
+impl PinnedBufferPool {
+ /// Create a pool with `pool_size` pinned buffers, each sized for
`elements_per_buffer` f64 values.
+ pub fn new(pool_size: usize, elements_per_buffer: usize) ->
Result<Arc<Self>> {
+ if pool_size == 0 {
+ return Err(MahoutError::InvalidInput(
+ "PinnedBufferPool requires at least one buffer".to_string(),
+ ));
+ }
+ if elements_per_buffer == 0 {
+ return Err(MahoutError::InvalidInput(
+ "PinnedBufferPool buffer size must be greater than
zero".to_string(),
+ ));
+ }
+
+ let mut buffers = Vec::with_capacity(pool_size);
+ for _ in 0..pool_size {
+ buffers.push(PinnedHostBuffer::new(elements_per_buffer)?);
+ }
+
+ Ok(Arc::new(Self {
+ free: Mutex::new(buffers),
+ available_cv: Condvar::new(),
+ capacity: pool_size,
+ elements_per_buffer,
+ }))
+ }
+
+ /// Acquire a pinned buffer, blocking until one is available.
+ pub fn acquire(self: &Arc<Self>) -> PinnedBufferHandle {
+ let mut free = self.free.lock().unwrap();
+ loop {
+ if let Some(buffer) = free.pop() {
+ return PinnedBufferHandle {
+ buffer: Some(buffer),
+ pool: Arc::clone(self),
+ };
+ }
+ free = self.available_cv.wait(free).unwrap();
Review Comment:
The `.unwrap()` calls on mutex lock operations can cause panics if the mutex
is poisoned. In a production system, poisoned mutex errors should be handled
more gracefully, either by propagating the error or by documenting that panic
behavior is intentional in these scenarios.
##########
qdp/qdp-core/src/gpu/buffer_pool.rs:
##########
@@ -0,0 +1,141 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Reusable pool of pinned host buffers for staging Disk → Host → GPU
transfers.
+//! Intended for producer/consumer pipelines that need a small, fixed set of
+//! page-locked buffers to avoid repeated cudaHostAlloc / cudaFreeHost.
+
+use std::sync::{Arc, Condvar, Mutex};
+
+use crate::error::{MahoutError, Result};
+use crate::gpu::memory::PinnedHostBuffer;
+
+/// Handle that automatically returns a buffer to the pool on drop.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferHandle {
+ buffer: Option<PinnedHostBuffer>,
+ pool: Arc<PinnedBufferPool>,
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::Deref for PinnedBufferHandle {
+ type Target = PinnedHostBuffer;
+
+ fn deref(&self) -> &Self::Target {
+ self.buffer
+ .as_ref()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::DerefMut for PinnedBufferHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ self.buffer
+ .as_mut()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PinnedBufferHandle {
+ fn drop(&mut self) {
+ if let Some(buf) = self.buffer.take() {
+ let mut free = self.pool.free.lock().unwrap();
+ free.push(buf);
+ self.pool.available_cv.notify_one();
+ }
+ }
+}
+
+/// Pool of pinned host buffers sized for a fixed batch shape.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferPool {
+ free: Mutex<Vec<PinnedHostBuffer>>,
+ available_cv: Condvar,
+ capacity: usize,
+ elements_per_buffer: usize,
+}
+
+#[cfg(target_os = "linux")]
+impl PinnedBufferPool {
+ /// Create a pool with `pool_size` pinned buffers, each sized for
`elements_per_buffer` f64 values.
+ pub fn new(pool_size: usize, elements_per_buffer: usize) ->
Result<Arc<Self>> {
+ if pool_size == 0 {
+ return Err(MahoutError::InvalidInput(
+ "PinnedBufferPool requires at least one buffer".to_string(),
+ ));
+ }
+ if elements_per_buffer == 0 {
+ return Err(MahoutError::InvalidInput(
+ "PinnedBufferPool buffer size must be greater than
zero".to_string(),
+ ));
+ }
+
+ let mut buffers = Vec::with_capacity(pool_size);
+ for _ in 0..pool_size {
+ buffers.push(PinnedHostBuffer::new(elements_per_buffer)?);
+ }
+
+ Ok(Arc::new(Self {
+ free: Mutex::new(buffers),
+ available_cv: Condvar::new(),
+ capacity: pool_size,
+ elements_per_buffer,
+ }))
+ }
+
+ /// Acquire a pinned buffer, blocking until one is available.
+ pub fn acquire(self: &Arc<Self>) -> PinnedBufferHandle {
+ let mut free = self.free.lock().unwrap();
+ loop {
+ if let Some(buffer) = free.pop() {
+ return PinnedBufferHandle {
+ buffer: Some(buffer),
+ pool: Arc::clone(self),
+ };
+ }
+ free = self.available_cv.wait(free).unwrap();
+ }
+ }
+
+ /// Try to acquire a pinned buffer from the pool.
+ ///
+ /// Returns `None` if the pool is currently empty; callers can choose to
spin/wait
+ /// or fall back to synchronous paths.
+ pub fn try_acquire(self: &Arc<Self>) -> Option<PinnedBufferHandle> {
+ let mut free = self.free.lock().unwrap();
+ free.pop().map(|buffer| PinnedBufferHandle {
+ buffer: Some(buffer),
+ pool: Arc::clone(self),
+ })
+ }
+
+ /// Number of buffers currently available.
+ pub fn available(&self) -> usize {
+ self.free.lock().unwrap().len()
+ }
+
+ /// Total number of buffers managed by this pool.
+ pub fn capacity(&self) -> usize {
+ self.capacity
+ }
+
+ /// Fixed element capacity for each buffer in the pool.
+ pub fn elements_per_buffer(&self) -> usize {
+ self.elements_per_buffer
+ }
+}
Review Comment:
The new `PinnedBufferPool` implementation lacks test coverage. Consider
adding tests to verify buffer acquisition/release patterns, concurrent access
behavior, pool exhaustion scenarios, and error handling (e.g., zero pool size,
zero buffer size).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]