This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new b53453a3 fix(io_uring): fix buffer reading with memory pool enabled 
(#1964)
b53453a3 is described below

commit b53453a3d5664387a58d28a22a8a209d273641fa
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sat Jul 5 19:25:59 2025 +0200

    fix(io_uring): fix buffer reading with memory pool enabled (#1964)
    
    Replace unsafe set_len() with slice() for reading exact buffer sizes.
    This fixes hangs when memory pool returns larger buffers than requested.
    Following compio's recommended pattern: buffer.slice(0..exact_size).
---
 .../handlers/messages/send_messages_handler.rs     | 29 +++++++++++-----------
 core/server/src/streaming/utils/memory_pool.rs     | 15 +++--------
 core/server/src/streaming/utils/pooled_buffer.rs   |  2 +-
 3 files changed, 19 insertions(+), 27 deletions(-)

diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 47df8270..bb536c61 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -27,7 +27,7 @@ use crate::streaming::segments::{IggyIndexesMut, 
IggyMessagesBatchMut};
 use crate::streaming::session::Session;
 use crate::streaming::utils::PooledBuffer;
 use anyhow::Result;
-use bytes::BytesMut;
+use compio::buf::{IntoInner as _, IoBuf};
 use iggy_common::Identifier;
 use iggy_common::Sizeable;
 use iggy_common::{INDEX_SIZE, IdKind};
@@ -57,17 +57,18 @@ impl ServerCommandHandler for SendMessages {
         let total_payload_size = length as usize - std::mem::size_of::<u32>();
         let metadata_len_field_size = std::mem::size_of::<u32>();
 
-        let mut metadata_length_buffer = BytesMut::with_capacity(4);
-        unsafe { metadata_length_buffer.set_len(4) };
-        let (result, metadata_len_buf) = 
sender.read(metadata_length_buffer).await;
+        let metadata_length_buffer = PooledBuffer::with_capacity(4);
+        let (result, metadata_len_buf) = 
sender.read(metadata_length_buffer.slice(0..4)).await;
+        let metadata_len_buf = metadata_len_buf.into_inner();
         result?;
-        let metadata_len_buf = metadata_len_buf.freeze();
         let metadata_size = 
u32::from_le_bytes(metadata_len_buf[..].try_into().unwrap());
 
-        let mut metadata_buffer = PooledBuffer::with_capacity(metadata_size as 
usize);
-        unsafe { metadata_buffer.set_len(metadata_size as usize) };
-        let (result, metadata_buf) = sender.read(metadata_buffer).await;
+        let metadata_buffer = PooledBuffer::with_capacity(metadata_size as 
usize);
+        let (result, metadata_buf) = sender
+            .read(metadata_buffer.slice(0..metadata_size as usize))
+            .await;
         result?;
+        let metadata_buf = metadata_buf.into_inner();
 
         let mut element_size = 0;
 
@@ -90,17 +91,17 @@ impl ServerCommandHandler for SendMessages {
         );
         let indexes_size = messages_count as usize * INDEX_SIZE;
 
-        let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size);
-        unsafe { indexes_buffer.set_len(indexes_size) };
-        let (result, indexes_buffer) = sender.read(indexes_buffer).await;
+        let indexes_buffer = PooledBuffer::with_capacity(indexes_size);
+        let (result, indexes_buffer) = 
sender.read(indexes_buffer.slice(0..indexes_size)).await;
         result?;
+        let indexes_buffer = indexes_buffer.into_inner();
 
         let messages_size =
             total_payload_size - metadata_size as usize - indexes_size - 
metadata_len_field_size;
-        let mut messages_buffer = PooledBuffer::with_capacity(messages_size);
-        unsafe { messages_buffer.set_len(messages_size) };
-        let (result, messages_buffer) = sender.read(messages_buffer).await;
+        let messages_buffer = PooledBuffer::with_capacity(messages_size);
+        let (result, messages_buffer) = 
sender.read(messages_buffer.slice(0..messages_size)).await;
         result?;
+        let messages_buffer = messages_buffer.into_inner();
 
         let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0);
         let batch = IggyMessagesBatchMut::from_indexes_and_messages(
diff --git a/core/server/src/streaming/utils/memory_pool.rs 
b/core/server/src/streaming/utils/memory_pool.rs
index 16ad2087..675c0076 100644
--- a/core/server/src/streaming/utils/memory_pool.rs
+++ b/core/server/src/streaming/utils/memory_pool.rs
@@ -155,21 +155,12 @@ impl MemoryPool {
 
     /// Initialize the global pool from the given config.
     pub fn init_pool(config: Arc<SystemConfig>) {
-        // TODO: Fixme (stary napraw).
-        let is_enabled = false;
+        let is_enabled = config.memory_pool.enabled;
         let memory_limit = config.memory_pool.size.as_bytes_usize();
         let bucket_capacity = config.memory_pool.bucket_capacity as usize;
 
-        let pool = MemoryPool::new(is_enabled, memory_limit, bucket_capacity);
-        if MEMORY_POOL.set(pool).is_err() {
-            warn!("Memory pool already initialized.");
-            // This shouldn't ever happen in production code, only in tests
-            // if someone forgets to add #[serial] tag to tests that have 
different
-            // memory pool limits (different instances are created within same 
executable).
-            if memory_pool().memory_limit != memory_limit {
-                panic!("Previously initialized memory pool has a different 
limit.");
-            }
-        }
+        let _ =
+            MEMORY_POOL.get_or_init(|| MemoryPool::new(is_enabled, 
memory_limit, bucket_capacity));
     }
 
     /// Acquire a `BytesMut` buffer with at least `capacity` bytes.
diff --git a/core/server/src/streaming/utils/pooled_buffer.rs 
b/core/server/src/streaming/utils/pooled_buffer.rs
index 39ef25be..6bdaa357 100644
--- a/core/server/src/streaming/utils/pooled_buffer.rs
+++ b/core/server/src/streaming/utils/pooled_buffer.rs
@@ -229,7 +229,7 @@ impl SetBufInit for PooledBuffer {
 
 unsafe impl IoBufMut for PooledBuffer {
     fn as_buf_mut_ptr(&mut self) -> *mut u8 {
-        self.inner.as_buf_mut_ptr()
+        self.inner.as_mut_ptr()
     }
 }
 

Reply via email to