This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_direct_io_socket_transfer
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 0ec4c6568de1fe9fe85c3c64d11ae0287a5c6946
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jul 9 19:56:20 2025 +0200

    4096 alignment
---
 .../handlers/messages/send_messages_handler.rs     |  6 +++---
 core/server/src/streaming/segments/direct_file.rs  | 22 +++++-----------------
 core/server/src/streaming/utils/memory_pool.rs     |  2 +-
 3 files changed, 9 insertions(+), 21 deletions(-)

diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 5813c2588..bcd3e016e 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -25,7 +25,7 @@ use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{ShardMessage, ShardRequest, 
ShardRequestPayload};
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
 use crate::streaming::session::Session;
-use crate::streaming::utils::PooledBuffer;
+use crate::streaming::utils::{ALIGNMENT, PooledBuffer};
 use anyhow::Result;
 use compio::buf::{IntoInner as _, IoBuf};
 use iggy_common::Identifier;
@@ -93,7 +93,7 @@ impl ServerCommandHandler for SendMessages {
         );
         let indexes_size = messages_count as usize * INDEX_SIZE;
 
-        let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size + 
512); // extra space for possible padding to not cause reallocations
+        let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size + 
ALIGNMENT); // extra space for possible padding to not cause reallocations
         let indexes_buffer = sender
             .read(indexes_buffer.slice(0..indexes_size))
             .await?
@@ -102,7 +102,7 @@ impl ServerCommandHandler for SendMessages {
         let messages_size =
             total_payload_size - metadata_size as usize - indexes_size - 
metadata_len_field_size;
 
-        let mut messages_buffer = PooledBuffer::with_capacity(messages_size + 
512); // extra space for possible padding to not cause reallocations
+        let mut messages_buffer = PooledBuffer::with_capacity(messages_size + 
ALIGNMENT); // extra space for possible padding to not cause reallocations
         let messages_buffer = sender
             .read(messages_buffer.slice(0..messages_size))
             .await?
diff --git a/core/server/src/streaming/segments/direct_file.rs 
b/core/server/src/streaming/segments/direct_file.rs
index 7de7c4b33..76981c0bd 100644
--- a/core/server/src/streaming/segments/direct_file.rs
+++ b/core/server/src/streaming/segments/direct_file.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::streaming::utils::{ALIGNMENT, PooledBuffer};
-use compio::buf::{IntoInner, IoBuf};
+use compio::buf::IoBuf;
 use compio::fs::{File, OpenOptions};
 use compio::io::AsyncWriteAtExt;
 use error_set::ErrContext;
@@ -25,7 +25,7 @@ use iggy_common::IggyError;
 
 const O_DIRECT: i32 = 0x4000;
 const O_DSYNC: i32 = 0x1000;
-
+const SCRATCH_SIZE: usize = ALIGNMENT * 8;
 /// Cache line padding to prevent false sharing
 /// Most modern CPUs have 64-byte cache lines
 #[repr(align(64))]
@@ -34,9 +34,9 @@ struct Padded<T>(T);
 
 /// Stack-allocated scratch buffer for small writes
 /// Aligned to 4KiB for optimal performance
-#[repr(align(512))]
+#[repr(align(4096))]
 #[derive(Debug)]
-struct ScratchBuffer([u8; 4096]);
+struct ScratchBuffer([u8; SCRATCH_SIZE]);
 
 /// A wrapper that allows us to pass a slice of ScratchBuffer as IoBuf
 /// This is safe because DirectFile owns the ScratchBuffer for its entire 
lifetime
@@ -124,7 +124,7 @@ impl DirectFile {
             tail: PooledBuffer::with_capacity(ALIGNMENT),
             tail_len: Padded(0),
             spare: PooledBuffer::with_capacity(ALIGNMENT),
-            scratch: ScratchBuffer([0u8; 4096]),
+            scratch: ScratchBuffer([0u8; SCRATCH_SIZE]),
         })
     }
 
@@ -142,18 +142,6 @@ impl DirectFile {
             .map(|metadata| metadata.len())
     }
 
-    fn new(file: File, file_path: String, initial_position: u64) -> Self {
-        Self {
-            file_path,
-            file,
-            file_position: initial_position,
-            tail: PooledBuffer::with_capacity(ALIGNMENT),
-            tail_len: Padded(0),
-            spare: PooledBuffer::with_capacity(ALIGNMENT),
-            scratch: ScratchBuffer([0u8; 4096]),
-        }
-    }
-
     /// Write data from an owned PooledBuffer with zero-copy optimization
     ///
     /// This method takes ownership of the buffer to satisfy the 'static 
lifetime
diff --git a/core/server/src/streaming/utils/memory_pool.rs 
b/core/server/src/streaming/utils/memory_pool.rs
index df0783b14..5d24a0407 100644
--- a/core/server/src/streaming/utils/memory_pool.rs
+++ b/core/server/src/streaming/utils/memory_pool.rs
@@ -26,7 +26,7 @@ use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use tracing::{error, info, trace, warn};
 
-pub const ALIGNMENT: usize = 512;
+pub const ALIGNMENT: usize = 4096;
 pub type Align512 = ConstAlign<ALIGNMENT>;
 pub type AlignedBuffer = AVec<u8, Align512>;
 

Reply via email to