This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_direct_io_socket_transfer in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0ec4c6568de1fe9fe85c3c64d11ae0287a5c6946 Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Jul 9 19:56:20 2025 +0200 4096 alignment --- .../handlers/messages/send_messages_handler.rs | 6 +++--- core/server/src/streaming/segments/direct_file.rs | 22 +++++----------------- core/server/src/streaming/utils/memory_pool.rs | 2 +- 3 files changed, 9 insertions(+), 21 deletions(-) diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 5813c2588..bcd3e016e 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -25,7 +25,7 @@ use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ShardMessage, ShardRequest, ShardRequestPayload}; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::session::Session; -use crate::streaming::utils::PooledBuffer; +use crate::streaming::utils::{ALIGNMENT, PooledBuffer}; use anyhow::Result; use compio::buf::{IntoInner as _, IoBuf}; use iggy_common::Identifier; @@ -93,7 +93,7 @@ impl ServerCommandHandler for SendMessages { ); let indexes_size = messages_count as usize * INDEX_SIZE; - let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size + 512); // extra space for possible padding to not cause reallocations + let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size + ALIGNMENT); // extra space for possible padding to not cause reallocations let indexes_buffer = sender .read(indexes_buffer.slice(0..indexes_size)) .await? @@ -102,7 +102,7 @@ impl ServerCommandHandler for SendMessages { let messages_size = total_payload_size - metadata_size as usize - indexes_size - metadata_len_field_size; - let mut messages_buffer = PooledBuffer::with_capacity(messages_size + 512); // extra space for possible padding to not cause reallocations + let mut messages_buffer = PooledBuffer::with_capacity(messages_size + ALIGNMENT); // extra space for possible padding to not cause reallocations let messages_buffer = sender .read(messages_buffer.slice(0..messages_size)) .await? diff --git a/core/server/src/streaming/segments/direct_file.rs b/core/server/src/streaming/segments/direct_file.rs index 7de7c4b33..76981c0bd 100644 --- a/core/server/src/streaming/segments/direct_file.rs +++ b/core/server/src/streaming/segments/direct_file.rs @@ -17,7 +17,7 @@ */ use crate::streaming::utils::{ALIGNMENT, PooledBuffer}; -use compio::buf::{IntoInner, IoBuf}; +use compio::buf::IoBuf; use compio::fs::{File, OpenOptions}; use compio::io::AsyncWriteAtExt; use error_set::ErrContext; @@ -25,7 +25,7 @@ use iggy_common::IggyError; const O_DIRECT: i32 = 0x4000; const O_DSYNC: i32 = 0x1000; - +const SCRATCH_SIZE: usize = ALIGNMENT * 8; /// Cache line padding to prevent false sharing /// Most modern CPUs have 64-byte cache lines #[repr(align(64))] @@ -34,9 +34,9 @@ struct Padded<T>(T); /// Stack-allocated scratch buffer for small writes /// Aligned to 4KiB for optimal performance -#[repr(align(512))] +#[repr(align(4096))] #[derive(Debug)] -struct ScratchBuffer([u8; 4096]); +struct ScratchBuffer([u8; SCRATCH_SIZE]); /// A wrapper that allows us to pass a slice of ScratchBuffer as IoBuf /// This is safe because DirectFile owns the ScratchBuffer for its entire lifetime @@ -124,7 +124,7 @@ impl DirectFile { tail: PooledBuffer::with_capacity(ALIGNMENT), tail_len: Padded(0), spare: PooledBuffer::with_capacity(ALIGNMENT), - scratch: ScratchBuffer([0u8; 4096]), + scratch: ScratchBuffer([0u8; SCRATCH_SIZE]), }) } @@ -142,18 +142,6 @@ impl DirectFile { .map(|metadata| metadata.len()) } - fn new(file: File, file_path: String, initial_position: u64) -> Self { - Self { - file_path, - file, - file_position: initial_position, - tail: PooledBuffer::with_capacity(ALIGNMENT), - tail_len: Padded(0), - spare: PooledBuffer::with_capacity(ALIGNMENT), - scratch: ScratchBuffer([0u8; 4096]), - } - } - /// Write data from an owned PooledBuffer with zero-copy optimization /// /// This method takes ownership of the buffer to satisfy the 'static lifetime diff --git a/core/server/src/streaming/utils/memory_pool.rs b/core/server/src/streaming/utils/memory_pool.rs index df0783b14..5d24a0407 100644 --- a/core/server/src/streaming/utils/memory_pool.rs +++ b/core/server/src/streaming/utils/memory_pool.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tracing::{error, info, trace, warn}; -pub const ALIGNMENT: usize = 512; +pub const ALIGNMENT: usize = 4096; pub type Align512 = ConstAlign<ALIGNMENT>; pub type AlignedBuffer = AVec<u8, Align512>;
