This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new b53453a3 fix(io_uring): fix buffer reading with memory pool enabled
(#1964)
b53453a3 is described below
commit b53453a3d5664387a58d28a22a8a209d273641fa
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sat Jul 5 19:25:59 2025 +0200
fix(io_uring): fix buffer reading with memory pool enabled (#1964)
Replace unsafe set_len() with slice() for reading exact buffer sizes.
This fixes hangs when memory pool returns larger buffers than requested.
Following compio's recommended pattern: buffer.slice(0..exact_size).
---
.../handlers/messages/send_messages_handler.rs | 29 +++++++++++-----------
core/server/src/streaming/utils/memory_pool.rs | 15 +++--------
core/server/src/streaming/utils/pooled_buffer.rs | 2 +-
3 files changed, 19 insertions(+), 27 deletions(-)
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 47df8270..bb536c61 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -27,7 +27,7 @@ use crate::streaming::segments::{IggyIndexesMut,
IggyMessagesBatchMut};
use crate::streaming::session::Session;
use crate::streaming::utils::PooledBuffer;
use anyhow::Result;
-use bytes::BytesMut;
+use compio::buf::{IntoInner as _, IoBuf};
use iggy_common::Identifier;
use iggy_common::Sizeable;
use iggy_common::{INDEX_SIZE, IdKind};
@@ -57,17 +57,18 @@ impl ServerCommandHandler for SendMessages {
let total_payload_size = length as usize - std::mem::size_of::<u32>();
let metadata_len_field_size = std::mem::size_of::<u32>();
- let mut metadata_length_buffer = BytesMut::with_capacity(4);
- unsafe { metadata_length_buffer.set_len(4) };
- let (result, metadata_len_buf) =
sender.read(metadata_length_buffer).await;
+ let metadata_length_buffer = PooledBuffer::with_capacity(4);
+ let (result, metadata_len_buf) =
sender.read(metadata_length_buffer.slice(0..4)).await;
+ let metadata_len_buf = metadata_len_buf.into_inner();
result?;
- let metadata_len_buf = metadata_len_buf.freeze();
let metadata_size =
u32::from_le_bytes(metadata_len_buf[..].try_into().unwrap());
- let mut metadata_buffer = PooledBuffer::with_capacity(metadata_size as
usize);
- unsafe { metadata_buffer.set_len(metadata_size as usize) };
- let (result, metadata_buf) = sender.read(metadata_buffer).await;
+ let metadata_buffer = PooledBuffer::with_capacity(metadata_size as
usize);
+ let (result, metadata_buf) = sender
+ .read(metadata_buffer.slice(0..metadata_size as usize))
+ .await;
result?;
+ let metadata_buf = metadata_buf.into_inner();
let mut element_size = 0;
@@ -90,17 +91,17 @@ impl ServerCommandHandler for SendMessages {
);
let indexes_size = messages_count as usize * INDEX_SIZE;
- let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size);
- unsafe { indexes_buffer.set_len(indexes_size) };
- let (result, indexes_buffer) = sender.read(indexes_buffer).await;
+ let indexes_buffer = PooledBuffer::with_capacity(indexes_size);
+ let (result, indexes_buffer) =
sender.read(indexes_buffer.slice(0..indexes_size)).await;
result?;
+ let indexes_buffer = indexes_buffer.into_inner();
let messages_size =
total_payload_size - metadata_size as usize - indexes_size -
metadata_len_field_size;
- let mut messages_buffer = PooledBuffer::with_capacity(messages_size);
- unsafe { messages_buffer.set_len(messages_size) };
- let (result, messages_buffer) = sender.read(messages_buffer).await;
+ let messages_buffer = PooledBuffer::with_capacity(messages_size);
+ let (result, messages_buffer) =
sender.read(messages_buffer.slice(0..messages_size)).await;
result?;
+ let messages_buffer = messages_buffer.into_inner();
let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0);
let batch = IggyMessagesBatchMut::from_indexes_and_messages(
diff --git a/core/server/src/streaming/utils/memory_pool.rs
b/core/server/src/streaming/utils/memory_pool.rs
index 16ad2087..675c0076 100644
--- a/core/server/src/streaming/utils/memory_pool.rs
+++ b/core/server/src/streaming/utils/memory_pool.rs
@@ -155,21 +155,12 @@ impl MemoryPool {
/// Initialize the global pool from the given config.
pub fn init_pool(config: Arc<SystemConfig>) {
- // TODO: Fixme (stary napraw).
- let is_enabled = false;
+ let is_enabled = config.memory_pool.enabled;
let memory_limit = config.memory_pool.size.as_bytes_usize();
let bucket_capacity = config.memory_pool.bucket_capacity as usize;
- let pool = MemoryPool::new(is_enabled, memory_limit, bucket_capacity);
- if MEMORY_POOL.set(pool).is_err() {
- warn!("Memory pool already initialized.");
- // This shouldn't ever happen in production code, only in tests
- // if someone forgets to add #[serial] tag to tests that have
different
- // memory pool limits (different instances are created within same
executable).
- if memory_pool().memory_limit != memory_limit {
- panic!("Previously initialized memory pool has a different
limit.");
- }
- }
+ let _ =
+ MEMORY_POOL.get_or_init(|| MemoryPool::new(is_enabled,
memory_limit, bucket_capacity));
}
/// Acquire a `BytesMut` buffer with at least `capacity` bytes.
diff --git a/core/server/src/streaming/utils/pooled_buffer.rs
b/core/server/src/streaming/utils/pooled_buffer.rs
index 39ef25be..6bdaa357 100644
--- a/core/server/src/streaming/utils/pooled_buffer.rs
+++ b/core/server/src/streaming/utils/pooled_buffer.rs
@@ -229,7 +229,7 @@ impl SetBufInit for PooledBuffer {
unsafe impl IoBufMut for PooledBuffer {
fn as_buf_mut_ptr(&mut self) -> *mut u8 {
- self.inner.as_buf_mut_ptr()
+ self.inner.as_mut_ptr()
}
}