This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_direct_io_socket_transfer
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 9e984026bf01c1a77b60041a166bc4c65d24b7b3
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jul 7 21:33:25 2025 +0200

    feat(io_uring): implement directio and aligned PooledBuffer
---
 Cargo.lock                                         | 1018 ++++----------------
 core/server/Cargo.toml                             |    1 +
 .../handlers/messages/send_messages_handler.rs     |   42 +-
 core/server/src/binary/handlers/utils.rs           |    3 +-
 core/server/src/binary/sender.rs                   |    7 +-
 core/server/src/main.rs                            |   11 +
 core/server/src/quic/quic_sender.rs                |    2 +-
 core/server/src/shard/mod.rs                       |   28 +-
 core/server/src/shard/system/storage.rs            |    2 +-
 core/server/src/streaming/segments/direct_file.rs  |  443 +++++++++
 .../src/streaming/segments/indexes/index_reader.rs |   27 +-
 .../src/streaming/segments/indexes/index_writer.rs |  101 +-
 .../streaming/segments/messages/messages_reader.rs |   26 +-
 .../streaming/segments/messages/messages_writer.rs |  113 +--
 core/server/src/streaming/segments/messages/mod.rs |   28 +-
 core/server/src/streaming/segments/mod.rs          |    2 +
 core/server/src/streaming/segments/segment.rs      |   40 +-
 .../streaming/segments/types/messages_batch_mut.rs |   15 +-
 .../src/streaming/segments/writing_messages.rs     |   12 +-
 core/server/src/streaming/utils/memory_pool.rs     |   64 +-
 core/server/src/streaming/utils/mod.rs             |    2 +-
 core/server/src/streaming/utils/pooled_buffer.rs   |  152 +--
 core/server/src/tcp/connection_handler.rs          |   13 +-
 core/server/src/tcp/sender.rs                      |   18 +-
 core/server/src/tcp/tcp_sender.rs                  |    6 +-
 core/server/src/tcp/tcp_tls_sender.rs              |    2 +-
 26 files changed, 977 insertions(+), 1201 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 2a32b031d..20d0a61bd 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -308,12 +308,12 @@ dependencies = [
 ]
 
 [[package]]
-name = "aligned-array"
-version = "1.0.1"
+name = "aligned-vec"
+version = "0.6.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e05c92d086290f52938013f6242ac62bf7d401fab8ad36798a609faa65c3fd2c"
+checksum = "dc890384c8602f339876ded803c97ad529f3842aba97f6392b3dba0dd171769b"
 dependencies = [
- "generic-array",
+ "equator",
 ]
 
 [[package]]
@@ -619,15 +619,6 @@ dependencies = [
  "tokio-util",
 ]
 
-[[package]]
-name = "atoi"
-version = "2.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528"
-dependencies = [
- "num-traits",
-]
-
 [[package]]
 name = "atomic"
 version = "0.6.1"
@@ -652,12 +643,44 @@ version = "1.1.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
 
+[[package]]
+name = "attohttpc"
+version = "0.28.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "07a9b245ba0739fc90935094c29adbaee3f977218b5fb95e822e261cda7f56a3"
+dependencies = [
+ "http 1.3.1",
+ "log",
+ "rustls",
+ "serde",
+ "serde_json",
+ "url",
+ "webpki-roots 0.26.11",
+]
+
 [[package]]
 name = "autocfg"
 version = "1.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
 
+[[package]]
+name = "aws-creds"
+version = "0.38.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ba912106484991c456adb3364338a2534d0818bd9374b324b608074e3b55f581"
+dependencies = [
+ "attohttpc",
+ "home",
+ "log",
+ "quick-xml 0.32.0",
+ "rust-ini",
+ "serde",
+ "thiserror 1.0.69",
+ "time",
+ "url",
+]
+
 [[package]]
 name = "aws-lc-rs"
 version = "1.13.1"
@@ -681,6 +704,15 @@ dependencies = [
  "fs_extra",
 ]
 
+[[package]]
+name = "aws-region"
+version = "0.26.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "73ae4ae7c45238b60af0a3b27ef2fcc7bd5b8fdcd8a6d679919558b40d3eff7a"
+dependencies = [
+ "thiserror 1.0.69",
+]
+
 [[package]]
 name = "axum"
 version = "0.7.9"
@@ -831,12 +863,6 @@ version = "0.22.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
 
-[[package]]
-name = "base64ct"
-version = "1.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba"
-
 [[package]]
 name = "bcrypt"
 version = "0.17.0"
@@ -1285,6 +1311,15 @@ dependencies = [
  "thiserror 2.0.12",
 ]
 
+[[package]]
+name = "castaway"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0abae9be0aaf9ea96a3b1b8b1b55c602ca751eba1b1500220cea4ecbafe7c0d5"
+dependencies = [
+ "rustversion",
+]
+
 [[package]]
 name = "cc"
 version = "1.2.27"
@@ -1492,23 +1527,16 @@ dependencies = [
 ]
 
 [[package]]
-name = "compio"
-version = "0.15.0"
+name = "compact_str"
+version = "0.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "713c6293af093c202ad318e8f7bdc1de1a36d7a793bb77f7fc6bd6f1788659a9"
+checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f"
 dependencies = [
- "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-dispatcher",
- "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-fs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-net 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-process",
- "compio-quic",
- "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
- "compio-signal",
- "compio-tls 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "castaway",
+ "cfg-if",
+ "itoa",
+ "ryu",
+ "static_assertions",
 ]
 
 [[package]]
@@ -1516,26 +1544,15 @@ name = "compio"
 version = "0.15.0"
 source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be";
 dependencies = [
- "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-driver 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-fs 0.8.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-io 0.7.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-log 0.1.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-buf",
+ "compio-driver",
+ "compio-fs",
+ "compio-io",
+ "compio-log",
  "compio-macros",
- "compio-net 0.8.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-runtime 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-tls 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
-]
-
-[[package]]
-name = "compio-buf"
-version = "0.6.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3ce94a45a47ef8c0e3f44084fe67c8effc25e7ac1de6de2ee1a29a59e6c6ba8e"
-dependencies = [
- "arrayvec",
- "bytes",
- "libc",
+ "compio-net",
+ "compio-runtime",
+ "compio-tls",
 ]
 
 [[package]]
@@ -1548,43 +1565,6 @@ dependencies = [
  "libc",
 ]
 
-[[package]]
-name = "compio-dispatcher"
-version = "0.7.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0cdf8c613be826be410d8744ab30acc49cc5134a78e2aa25efae9efa44bed6a7"
-dependencies = [
- "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
- "flume",
- "futures-channel",
-]
-
-[[package]]
-name = "compio-driver"
-version = "0.8.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "737212fe00b4af769f7e8f156c25ffafd5888d4d21834e100ea068dea1086ef8"
-dependencies = [
- "aligned-array",
- "cfg-if",
- "cfg_aliases",
- "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "crossbeam-channel",
- "crossbeam-queue",
- "futures-util",
- "io-uring",
- "io_uring_buf_ring",
- "libc",
- "once_cell",
- "paste",
- "polling",
- "slab",
- "socket2 0.5.10",
- "windows-sys 0.52.0",
-]
-
 [[package]]
 name = "compio-driver"
 version = "0.8.1"
@@ -1592,8 +1572,8 @@ source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd
 dependencies = [
  "cfg-if",
  "cfg_aliases",
- "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-log 0.1.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-buf",
+ "compio-log",
  "crossbeam-channel",
  "crossbeam-queue",
  "futures-util",
@@ -1608,24 +1588,6 @@ dependencies = [
  "windows-sys 0.60.2",
 ]
 
-[[package]]
-name = "compio-fs"
-version = "0.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9bcf65e631d521c666bca25595f8e5c78173e96f0b3b61f0a7d93f31d9661d32"
-dependencies = [
- "cfg-if",
- "cfg_aliases",
- "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
- "libc",
- "os_pipe",
- "widestring",
- "windows-sys 0.52.0",
-]
-
 [[package]]
 name = "compio-fs"
 version = "0.8.0"
@@ -1633,48 +1595,27 @@ source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd
 dependencies = [
  "cfg-if",
  "cfg_aliases",
- "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-driver 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-io 0.7.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-runtime 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-buf",
+ "compio-driver",
+ "compio-io",
+ "compio-runtime",
  "libc",
  "os_pipe",
  "widestring",
  "windows-sys 0.60.2",
 ]
 
-[[package]]
-name = "compio-io"
-version = "0.7.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c2b05cc4142659f2c90b6e44c68568ff71c83c6fb9285aca686952250b914932"
-dependencies = [
- "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures-util",
- "paste",
- "pin-project-lite",
-]
-
 [[package]]
 name = "compio-io"
 version = "0.7.0"
 source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be";
 dependencies = [
- "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-buf",
  "futures-util",
  "paste",
  "pin-project-lite",
 ]
 
-[[package]]
-name = "compio-log"
-version = "0.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "fc4e560213c1996b618da369b7c9109564b41af9033802ae534465c4ee4e132f"
-dependencies = [
- "tracing",
-]
-
 [[package]]
 name = "compio-log"
 version = "0.1.0"
@@ -1694,35 +1635,16 @@ dependencies = [
  "syn 2.0.104",
 ]
 
-[[package]]
-name = "compio-net"
-version = "0.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0c1fabe3393bc0c3a0dca8e99a35bf97e42caa12bb3cc6bba83df04e28c9c142"
-dependencies = [
- "cfg-if",
- "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
- "either",
- "libc",
- "once_cell",
- "socket2 0.5.10",
- "widestring",
- "windows-sys 0.52.0",
-]
-
 [[package]]
 name = "compio-net"
 version = "0.8.0"
 source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be";
 dependencies = [
  "cfg-if",
- "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-driver 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-io 0.7.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-runtime 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-buf",
+ "compio-driver",
+ "compio-io",
+ "compio-runtime",
  "either",
  "libc",
  "once_cell",
@@ -1731,64 +1653,6 @@ dependencies = [
  "windows-sys 0.60.2",
 ]
 
-[[package]]
-name = "compio-process"
-version = "0.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3867cfe7b23eaae89ff815aba4fdde61cb6fd55f81fd368128300c6b7e645016"
-dependencies = [
- "cfg-if",
- "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
- "futures-util",
- "windows-sys 0.52.0",
-]
-
-[[package]]
-name = "compio-quic"
-version = "0.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6f107e044329f1e171930801b09bfc6e764c5e171e45c7a3e382f98561da619a"
-dependencies = [
- "cfg_aliases",
- "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-net 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
- "flume",
- "futures-util",
- "libc",
- "quinn-proto",
- "rustc-hash 2.1.1",
- "rustls",
- "thiserror 2.0.12",
- "windows-sys 0.52.0",
-]
-
-[[package]]
-name = "compio-runtime"
-version = "0.8.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b7df559e87b7ab05ba61c32619f6076dd5cc2daf5a8cb30cb9931fb355d20aff"
-dependencies = [
- "async-task",
- "cfg-if",
- "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "crossbeam-queue",
- "futures-util",
- "libc",
- "once_cell",
- "scoped-tls",
- "slab",
- "socket2 0.5.10",
- "windows-sys 0.52.0",
-]
-
 [[package]]
 name = "compio-runtime"
 version = "0.8.1"
@@ -1796,9 +1660,9 @@ source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd
 dependencies = [
  "async-task",
  "cfg-if",
- "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-driver 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-log 0.1.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-buf",
+ "compio-driver",
+ "compio-log",
  "core_affinity",
  "crossbeam-queue",
  "futures-util",
@@ -1810,40 +1674,13 @@ dependencies = [
  "windows-sys 0.60.2",
 ]
 
-[[package]]
-name = "compio-signal"
-version = "0.6.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "03d2931880b03b33d4df7d2b8a008e93731366d185358c7442fc8d24d5f9c1bd"
-dependencies = [
- "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
- "libc",
- "once_cell",
- "os_pipe",
- "slab",
- "windows-sys 0.52.0",
-]
-
-[[package]]
-name = "compio-tls"
-version = "0.6.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "542bb0e0f6f65cb84bc09b7e052fa54f006d1ba228a8dfad6d7b9676defe7232"
-dependencies = [
- "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "rustls",
-]
-
 [[package]]
 name = "compio-tls"
 version = "0.6.0"
 source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be";
 dependencies = [
- "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
- "compio-io 0.7.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-buf",
+ "compio-io",
  "rustls",
 ]
 
@@ -1937,12 +1774,6 @@ dependencies = [
  "wasm-bindgen",
 ]
 
-[[package]]
-name = "const-oid"
-version = "0.9.6"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
-
 [[package]]
 name = "const-random"
 version = "0.1.18"
@@ -2040,21 +1871,6 @@ dependencies = [
  "libc",
 ]
 
-[[package]]
-name = "crc"
-version = "3.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675"
-dependencies = [
- "crc-catalog",
-]
-
-[[package]]
-name = "crc-catalog"
-version = "2.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
-
 [[package]]
 name = "crc32fast"
 version = "1.4.2"
@@ -2259,46 +2075,6 @@ dependencies = [
  "regex-syntax 0.7.5",
 ]
 
-[[package]]
-name = "cyper"
-version = "0.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "89b65af5073b4f53c9697b611b414042e71c6a14e11088438a67b1ef36f51ca2"
-dependencies = [
- "async-stream",
- "base64 0.22.1",
- "compio 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "cyper-core",
- "encoding_rs",
- "futures-util",
- "http 1.3.1",
- "http-body-util",
- "hyper",
- "hyper-util",
- "mime",
- "send_wrapper",
- "serde",
- "serde_urlencoded",
- "thiserror 2.0.12",
- "url",
-]
-
-[[package]]
-name = "cyper-core"
-version = "0.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6343deaa569c748860d9afefab636648e7e6f9abdfc26b3b9dde327170ae6b2b"
-dependencies = [
- "cfg-if",
- "compio 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "futures-util",
- "hyper",
- "hyper-util",
- "rustls-platform-verifier 0.6.0",
- "send_wrapper",
- "tower-service",
-]
-
 [[package]]
 name = "darling"
 version = "0.20.11"
@@ -2379,17 +2155,6 @@ version = "0.1.9"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "da692b8d1080ea3045efaab14434d40468c3d8657e42abddfffca87b428f4c1b"
 
-[[package]]
-name = "der"
-version = "0.7.10"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb"
-dependencies = [
- "const-oid",
- "pem-rfc7468",
- "zeroize",
-]
-
 [[package]]
 name = "deranged"
 version = "0.4.0"
@@ -2501,7 +2266,6 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
 dependencies = [
  "block-buffer",
- "const-oid",
  "crypto-common",
  "subtle",
 ]
@@ -2643,9 +2407,6 @@ name = "either"
 version = "1.15.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
-dependencies = [
- "serde",
-]
 
 [[package]]
 name = "embedded-io"
@@ -2710,10 +2471,30 @@ dependencies = [
 ]
 
 [[package]]
-name = "equivalent"
-version = "1.0.2"
+name = "equator"
+version = "0.4.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
+checksum = "4711b213838dfee0117e3be6ac926007d7f433d7bbe33595975d4190cb07e6fc"
+dependencies = [
+ "equator-macro",
+]
+
+[[package]]
+name = "equator-macro"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.104",
+]
+
+[[package]]
+name = "equivalent"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
 
 [[package]]
 name = "err_trail"
@@ -2758,17 +2539,6 @@ dependencies = [
  "syn 2.0.104",
 ]
 
-[[package]]
-name = "etcetera"
-version = "0.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943"
-dependencies = [
- "cfg-if",
- "home",
- "windows-sys 0.48.0",
-]
-
 [[package]]
 name = "event-listener"
 version = "5.4.0"
@@ -3042,17 +2812,6 @@ dependencies = [
  "futures-util",
 ]
 
-[[package]]
-name = "futures-intrusive"
-version = "0.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f"
-dependencies = [
- "futures-core",
- "lock_api",
- "parking_lot 0.12.4",
-]
-
 [[package]]
 name = "futures-io"
 version = "0.3.31"
@@ -3843,15 +3602,6 @@ version = "0.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
 
-[[package]]
-name = "hkdf"
-version = "0.12.4"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7"
-dependencies = [
- "hmac",
-]
-
 [[package]]
 name = "hmac"
 version = "0.12.1"
@@ -4219,7 +3969,6 @@ dependencies = [
  "integration",
  "nonzero_lit",
  "rand 0.9.1",
- "rayon",
  "serde",
  "sysinfo 0.35.2",
  "tokio",
@@ -4329,43 +4078,6 @@ dependencies = [
  "tracing",
 ]
 
-[[package]]
-name = "iggy_connector_postgres_sink"
-version = "0.1.0"
-dependencies = [
- "async-trait",
- "chrono",
- "dashmap",
- "futures",
- "iggy_connector_sdk",
- "once_cell",
- "serde",
- "simd-json",
- "sqlx",
- "tokio",
- "tracing",
-]
-
-[[package]]
-name = "iggy_connector_postgres_source"
-version = "0.1.0"
-dependencies = [
- "async-trait",
- "chrono",
- "dashmap",
- "futures",
- "humantime",
- "iggy_connector_sdk",
- "once_cell",
- "serde",
- "serde_json",
- "simd-json",
- "sqlx",
- "tokio",
- "tracing",
- "uuid",
-]
-
 [[package]]
 name = "iggy_connector_quickwit_sink"
 version = "0.1.0"
@@ -4591,7 +4303,6 @@ dependencies = [
  "async-trait",
  "bytes",
  "chrono",
- "compio 0.15.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
  "ctor",
  "derive_more 2.0.1",
  "env_logger",
@@ -4869,9 +4580,6 @@ name = "lazy_static"
 version = "1.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
-dependencies = [
- "spin",
-]
 
 [[package]]
 name = "lazycell"
@@ -4968,12 +4676,6 @@ dependencies = [
  "pkg-config",
 ]
 
-[[package]]
-name = "libm"
-version = "0.2.15"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de"
-
 [[package]]
 name = "libmimalloc-sys"
 version = "0.1.43"
@@ -4995,16 +4697,6 @@ dependencies = [
  "redox_syscall 0.5.13",
 ]
 
-[[package]]
-name = "libsqlite3-sys"
-version = "0.30.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149"
-dependencies = [
- "pkg-config",
- "vcpkg",
-]
-
 [[package]]
 name = "libyml"
 version = "0.0.5"
@@ -5184,15 +4876,22 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
 
 [[package]]
-name = "md-5"
-version = "0.10.6"
+name = "maybe-async"
+version = "0.2.10"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
+checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11"
 dependencies = [
- "cfg-if",
- "digest",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.104",
 ]
 
+[[package]]
+name = "md5"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
+
 [[package]]
 name = "memchr"
 version = "2.7.5"
@@ -5246,6 +4945,15 @@ dependencies = [
  "unicase",
 ]
 
+[[package]]
+name = "minidom"
+version = "0.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e394a0e3c7ccc2daea3dffabe82f09857b6b510cb25af87d54bf3e910ac1642d"
+dependencies = [
+ "rxml",
+]
+
 [[package]]
 name = "minimal-lexical"
 version = "0.2.1"
@@ -5476,23 +5184,6 @@ dependencies = [
  "num-traits",
 ]
 
-[[package]]
-name = "num-bigint-dig"
-version = "0.8.4"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151"
-dependencies = [
- "byteorder",
- "lazy_static",
- "libm",
- "num-integer",
- "num-iter",
- "num-traits",
- "rand 0.8.5",
- "smallvec",
- "zeroize",
-]
-
 [[package]]
 name = "num-complex"
 version = "0.4.6"
@@ -5561,7 +5252,6 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
 dependencies = [
  "autocfg",
- "libm",
 ]
 
 [[package]]
@@ -5996,15 +5686,6 @@ dependencies = [
  "serde",
 ]
 
-[[package]]
-name = "pem-rfc7468"
-version = "0.7.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412"
-dependencies = [
- "base64ct",
-]
-
 [[package]]
 name = "percent-encoding"
 version = "2.3.1"
@@ -6098,27 +5779,6 @@ dependencies = [
  "thiserror 1.0.69",
 ]
 
-[[package]]
-name = "pkcs1"
-version = "0.7.5"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f"
-dependencies = [
- "der",
- "pkcs8",
- "spki",
-]
-
-[[package]]
-name = "pkcs8"
-version = "0.10.2"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7"
-dependencies = [
- "der",
- "spki",
-]
-
 [[package]]
 name = "pkg-config"
 version = "0.3.32"
@@ -6463,9 +6123,19 @@ dependencies = [
 
 [[package]]
 name = "quick-xml"
-version = "0.37.5"
+version = "0.32.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb"
+checksum = "1d3a6e5838b60e0e8fa7a43f22ade549a37d61f8bdbe636d0d7816191de969c2"
+dependencies = [
+ "memchr",
+ "serde",
+]
+
+[[package]]
+name = "quick-xml"
+version = "0.36.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe"
 dependencies = [
  "memchr",
  "serde",
@@ -6506,7 +6176,7 @@ dependencies = [
  "rustc-hash 2.1.1",
  "rustls",
  "rustls-pki-types",
- "rustls-platform-verifier 0.5.3",
+ "rustls-platform-verifier",
  "slab",
  "thiserror 2.0.12",
  "tinyvec",
@@ -6794,12 +6464,14 @@ dependencies = [
  "sync_wrapper",
  "tokio",
  "tokio-rustls",
+ "tokio-util",
  "tower 0.5.2",
  "tower-http",
  "tower-service",
  "url",
  "wasm-bindgen",
  "wasm-bindgen-futures",
+ "wasm-streams",
  "web-sys",
  "webpki-roots 1.0.1",
 ]
@@ -6911,26 +6583,6 @@ version = "0.3.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746"
 
-[[package]]
-name = "rsa"
-version = "0.9.8"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "78928ac1ed176a5ca1d17e578a1825f3d81ca54cf41053a592584b020cfd691b"
-dependencies = [
- "const-oid",
- "digest",
- "num-bigint-dig",
- "num-integer",
- "num-traits",
- "pkcs1",
- "pkcs8",
- "rand_core 0.6.4",
- "signature",
- "spki",
- "subtle",
- "zeroize",
-]
-
 [[package]]
 name = "rust-ini"
 version = "0.21.1"
@@ -6942,6 +6594,40 @@ dependencies = [
  "trim-in-place",
 ]
 
+[[package]]
+name = "rust-s3"
+version = "0.36.0-beta.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "cc7d6f3a3dd397743e8f344ffc80ea7137aee423983ae25b512e5332ad11362f"
+dependencies = [
+ "async-trait",
+ "aws-creds",
+ "aws-region",
+ "base64 0.22.1",
+ "bytes",
+ "cfg-if",
+ "futures",
+ "hex",
+ "hmac",
+ "http 1.3.1",
+ "log",
+ "maybe-async",
+ "md5",
+ "minidom",
+ "percent-encoding",
+ "quick-xml 0.36.2",
+ "reqwest",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "sha2",
+ "thiserror 1.0.69",
+ "time",
+ "tokio",
+ "tokio-stream",
+ "url",
+]
+
 [[package]]
 name = "rust_decimal"
 version = "1.37.2"
@@ -7079,27 +6765,6 @@ dependencies = [
  "windows-sys 0.59.0",
 ]
 
-[[package]]
-name = "rustls-platform-verifier"
-version = "0.6.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "eda84358ed17f1f354cf4b1909ad346e6c7bc2513e8c40eb08e0157aa13a9070"
-dependencies = [
- "core-foundation",
- "core-foundation-sys",
- "jni",
- "log",
- "once_cell",
- "rustls",
- "rustls-native-certs",
- "rustls-platform-verifier-android",
- "rustls-webpki",
- "security-framework",
- "security-framework-sys",
- "webpki-root-certs 1.0.1",
- "windows-sys 0.52.0",
-]
-
 [[package]]
 name = "rustls-platform-verifier-android"
 version = "0.1.1"
@@ -7125,22 +6790,22 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d"
 
 [[package]]
-name = "rusty-s3"
-version = "0.7.0"
+name = "rxml"
+version = "0.11.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8f51a5a6b15f25d3e10c068039ee13befb6110fcb36c2b26317bcbdc23484d96"
+checksum = "65bc94b580d0f5a6b7a2d604e597513d3c673154b52ddeccd1d5c32360d945ee"
 dependencies = [
- "base64 0.22.1",
- "hmac",
- "md-5",
- "percent-encoding",
- "quick-xml",
- "serde",
- "serde_json",
- "sha2",
- "time",
- "url",
- "zeroize",
+ "bytes",
+ "rxml_validation",
+]
+
+[[package]]
+name = "rxml_validation"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "826e80413b9a35e9d33217b3dcac04cf95f6559d15944b93887a08be5496c4a4"
+dependencies = [
+ "compact_str",
 ]
 
 [[package]]
@@ -7274,15 +6939,6 @@ dependencies = [
  "serde",
 ]
 
-[[package]]
-name = "send_wrapper"
-version = "0.6.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
-dependencies = [
- "futures-core",
-]
-
 [[package]]
 name = "serde"
 version = "1.0.219"
@@ -7444,6 +7100,7 @@ name = "server"
 version = "0.5.0"
 dependencies = [
  "ahash 0.8.12",
+ "aligned-vec",
  "anyhow",
  "async-channel",
  "async_zip",
@@ -7455,11 +7112,10 @@ dependencies = [
  "bytes",
  "chrono",
  "clap",
- "compio 0.15.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio",
  "console-subscriber",
  "crossbeam",
  "ctrlc",
- "cyper",
  "dashmap",
  "derive_more 2.0.1",
  "dotenvy",
@@ -7488,9 +7144,9 @@ dependencies = [
  "quinn",
  "reqwest",
  "ring",
+ "rust-s3",
  "rustls",
  "rustls-pemfile",
- "rusty-s3",
  "serde",
  "serde_with",
  "serial_test",
@@ -7571,16 +7227,6 @@ dependencies = [
  "libc",
 ]
 
-[[package]]
-name = "signature"
-version = "2.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de"
-dependencies = [
- "digest",
- "rand_core 0.6.4",
-]
-
 [[package]]
 name = "simd-adler32"
 version = "0.3.7"
@@ -7637,9 +7283,6 @@ name = "smallvec"
 version = "1.15.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
-dependencies = [
- "serde",
-]
 
 [[package]]
 name = "smart-default"
@@ -7717,214 +7360,6 @@ dependencies = [
  "lock_api",
 ]
 
-[[package]]
-name = "spki"
-version = "0.7.3"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d"
-dependencies = [
- "base64ct",
- "der",
-]
-
-[[package]]
-name = "sqlx"
-version = "0.8.6"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc"
-dependencies = [
- "sqlx-core",
- "sqlx-macros",
- "sqlx-mysql",
- "sqlx-postgres",
- "sqlx-sqlite",
-]
-
-[[package]]
-name = "sqlx-core"
-version = "0.8.6"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6"
-dependencies = [
- "base64 0.22.1",
- "bytes",
- "chrono",
- "crc",
- "crossbeam-queue",
- "either",
- "event-listener",
- "futures-core",
- "futures-intrusive",
- "futures-io",
- "futures-util",
- "hashbrown 0.15.4",
- "hashlink",
- "indexmap 2.10.0",
- "log",
- "memchr",
- "once_cell",
- "percent-encoding",
- "rustls",
- "serde",
- "serde_json",
- "sha2",
- "smallvec",
- "thiserror 2.0.12",
- "tokio",
- "tokio-stream",
- "tracing",
- "url",
- "uuid",
- "webpki-roots 0.26.11",
-]
-
-[[package]]
-name = "sqlx-macros"
-version = "0.8.6"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d"
-dependencies = [
- "proc-macro2",
- "quote",
- "sqlx-core",
- "sqlx-macros-core",
- "syn 2.0.104",
-]
-
-[[package]]
-name = "sqlx-macros-core"
-version = "0.8.6"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b"
-dependencies = [
- "dotenvy",
- "either",
- "heck 0.5.0",
- "hex",
- "once_cell",
- "proc-macro2",
- "quote",
- "serde",
- "serde_json",
- "sha2",
- "sqlx-core",
- "sqlx-mysql",
- "sqlx-postgres",
- "sqlx-sqlite",
- "syn 2.0.104",
- "tokio",
- "url",
-]
-
-[[package]]
-name = "sqlx-mysql"
-version = "0.8.6"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526"
-dependencies = [
- "atoi",
- "base64 0.22.1",
- "bitflags 2.9.1",
- "byteorder",
- "bytes",
- "chrono",
- "crc",
- "digest",
- "dotenvy",
- "either",
- "futures-channel",
- "futures-core",
- "futures-io",
- "futures-util",
- "generic-array",
- "hex",
- "hkdf",
- "hmac",
- "itoa",
- "log",
- "md-5",
- "memchr",
- "once_cell",
- "percent-encoding",
- "rand 0.8.5",
- "rsa",
- "serde",
- "sha1",
- "sha2",
- "smallvec",
- "sqlx-core",
- "stringprep",
- "thiserror 2.0.12",
- "tracing",
- "uuid",
- "whoami",
-]
-
-[[package]]
-name = "sqlx-postgres"
-version = "0.8.6"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46"
-dependencies = [
- "atoi",
- "base64 0.22.1",
- "bitflags 2.9.1",
- "byteorder",
- "chrono",
- "crc",
- "dotenvy",
- "etcetera",
- "futures-channel",
- "futures-core",
- "futures-util",
- "hex",
- "hkdf",
- "hmac",
- "home",
- "itoa",
- "log",
- "md-5",
- "memchr",
- "once_cell",
- "rand 0.8.5",
- "serde",
- "serde_json",
- "sha2",
- "smallvec",
- "sqlx-core",
- "stringprep",
- "thiserror 2.0.12",
- "tracing",
- "uuid",
- "whoami",
-]
-
-[[package]]
-name = "sqlx-sqlite"
-version = "0.8.6"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea"
-dependencies = [
- "atoi",
- "chrono",
- "flume",
- "futures-channel",
- "futures-core",
- "futures-executor",
- "futures-intrusive",
- "futures-util",
- "libsqlite3-sys",
- "log",
- "percent-encoding",
- "serde",
- "serde_urlencoded",
- "sqlx-core",
- "thiserror 2.0.12",
- "tracing",
- "url",
- "uuid",
-]
-
 [[package]]
 name = "stable_deref_trait"
 version = "1.2.0"
@@ -7946,15 +7381,10 @@ dependencies = [
 ]
 
 [[package]]
-name = "stringprep"
-version = "0.1.5"
+name = "static_assertions"
+version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1"
-dependencies = [
- "unicode-bidi",
- "unicode-normalization",
- "unicode-properties",
-]
+checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
 
 [[package]]
 name = "strsim"
@@ -8730,12 +8160,6 @@ version = "2.8.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539"
 
-[[package]]
-name = "unicode-bidi"
-version = "0.3.18"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5"
-
 [[package]]
 name = "unicode-ident"
 version = "1.0.18"
@@ -8748,21 +8172,6 @@ version = "0.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "3b09c83c3c29d37506a3e260c08c03743a6bb66a9cd432c6934ab501a190571f"
 
-[[package]]
-name = "unicode-normalization"
-version = "0.1.24"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956"
-dependencies = [
- "tinyvec",
-]
-
-[[package]]
-name = "unicode-properties"
-version = "0.1.3"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0"
-
 [[package]]
 name = "unicode-segmentation"
 version = "1.12.0"
@@ -8987,12 +8396,6 @@ dependencies = [
  "wit-bindgen-rt",
 ]
 
-[[package]]
-name = "wasite"
-version = "0.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b"
-
 [[package]]
 name = "wasm-bindgen"
 version = "0.2.100"
@@ -9064,6 +8467,19 @@ dependencies = [
  "unicode-ident",
 ]
 
+[[package]]
+name = "wasm-streams"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
+dependencies = [
+ "futures-util",
+ "js-sys",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+]
+
 [[package]]
 name = "wasm-timer"
 version = "0.2.5"
@@ -9148,16 +8564,6 @@ dependencies = [
  "rustix 0.38.44",
 ]
 
-[[package]]
-name = "whoami"
-version = "1.6.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7"
-dependencies = [
- "redox_syscall 0.5.13",
- "wasite",
-]
-
 [[package]]
 name = "wide"
 version = "0.7.33"
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index fa7fc48f0..2932faf09 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -36,6 +36,7 @@ mimalloc = ["dep:mimalloc"]
 
 [dependencies]
 ahash = { workspace = true }
+aligned-vec = "0.6"
 anyhow = { workspace = true }
 async_zip = { workspace = true }
 axum = { workspace = true }
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index bb536c61e..6d155bcac 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -57,18 +57,19 @@ impl ServerCommandHandler for SendMessages {
         let total_payload_size = length as usize - std::mem::size_of::<u32>();
         let metadata_len_field_size = std::mem::size_of::<u32>();
 
-        let metadata_length_buffer = PooledBuffer::with_capacity(4);
-        let (result, metadata_len_buf) = 
sender.read(metadata_length_buffer.slice(0..4)).await;
-        let metadata_len_buf = metadata_len_buf.into_inner();
-        result?;
-        let metadata_size = 
u32::from_le_bytes(metadata_len_buf[..].try_into().unwrap());
-
-        let metadata_buffer = PooledBuffer::with_capacity(metadata_size as 
usize);
-        let (result, metadata_buf) = sender
+        let mut metadata_length_buffer = PooledBuffer::with_capacity(4);
+
+        let metadata_length_buffer = sender
+            .read(metadata_length_buffer.slice(0..4))
+            .await?
+            .into_inner();
+        let metadata_size = 
u32::from_le_bytes(metadata_length_buffer[0..4].try_into().unwrap());
+
+        let mut metadata_buffer = PooledBuffer::with_capacity(metadata_size as 
usize);
+        let metadata_buf = sender
             .read(metadata_buffer.slice(0..metadata_size as usize))
-            .await;
-        result?;
-        let metadata_buf = metadata_buf.into_inner();
+            .await?
+            .into_inner();
 
         let mut element_size = 0;
 
@@ -91,17 +92,20 @@ impl ServerCommandHandler for SendMessages {
         );
         let indexes_size = messages_count as usize * INDEX_SIZE;
 
-        let indexes_buffer = PooledBuffer::with_capacity(indexes_size);
-        let (result, indexes_buffer) = 
sender.read(indexes_buffer.slice(0..indexes_size)).await;
-        result?;
-        let indexes_buffer = indexes_buffer.into_inner();
+        let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size + 
512); // extra space for possible padding to not cause reallocations
+        let indexes_buffer = sender
+            .read(indexes_buffer.slice(0..indexes_size))
+            .await?
+            .into_inner();
 
         let messages_size =
             total_payload_size - metadata_size as usize - indexes_size - 
metadata_len_field_size;
-        let messages_buffer = PooledBuffer::with_capacity(messages_size);
-        let (result, messages_buffer) = 
sender.read(messages_buffer.slice(0..messages_size)).await;
-        result?;
-        let messages_buffer = messages_buffer.into_inner();
+
+        let mut messages_buffer = PooledBuffer::with_capacity(messages_size + 
512); // extra space for possible padding to not cause reallocations
+        let messages_buffer = sender
+            .read(messages_buffer.slice(0..messages_size))
+            .await?
+            .into_inner();
 
         let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0);
         let batch = IggyMessagesBatchMut::from_indexes_and_messages(
diff --git a/core/server/src/binary/handlers/utils.rs 
b/core/server/src/binary/handlers/utils.rs
index 80bcede49..54411d912 100644
--- a/core/server/src/binary/handlers/utils.rs
+++ b/core/server/src/binary/handlers/utils.rs
@@ -32,8 +32,7 @@ pub async fn receive_and_validate(
     let buffer = if length == 0 {
         buffer
     } else {
-        let (result, buffer) = sender.read(buffer).await;
-        result?;
+        let buffer = sender.read(buffer).await?;
         buffer
     };
 
diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs
index 96bed3d5c..c6f1a8112 100644
--- a/core/server/src/binary/sender.rs
+++ b/core/server/src/binary/sender.rs
@@ -56,10 +56,7 @@ macro_rules! forward_async_methods {
 }
 
 pub trait Sender {
-    fn read<B: IoBufMut>(
-        &mut self,
-        buffer: B,
-    ) -> impl Future<Output = (Result<usize, IggyError>, B)>;
+    fn read<B: IoBufMut>(&mut self, buffer: B) -> impl Future<Output = 
Result<B, IggyError>>;
     fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), 
IggyError>>;
     fn send_ok_response(&mut self, payload: &[u8]) -> impl Future<Output = 
Result<(), IggyError>>;
     fn send_ok_response_vectored(
@@ -98,7 +95,7 @@ impl SenderKind {
     }
 
     forward_async_methods! {
-        async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, 
IggyError>, B);
+        async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, 
IggyError>;
         async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>;
         async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), 
IggyError>;
         async fn send_ok_response_vectored(&mut self, length: &[u8], slices: 
Vec<PooledBuffer>) -> Result<(), IggyError>;
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index b1481c228..c9798aaae 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -58,6 +58,15 @@ use tracing::{error, info, instrument};
 
 const COMPONENT: &str = "MAIN";
 
+fn thread_print_memory_pool_stats() {
+    std::thread::sleep(std::time::Duration::from_secs(5));
+    let pool = server::streaming::utils::memory_pool();
+    loop {
+        pool.log_stats();
+        std::thread::sleep(std::time::Duration::from_secs(5));
+    }
+}
+
 #[instrument(skip_all, name = "trace_start_server")]
 fn main() -> Result<(), ServerError> {
     let startup_timestamp = Instant::now();
@@ -312,6 +321,8 @@ fn main() -> Result<(), ServerError> {
         .expect("Error setting Ctrl-C handler");
     */
 
+    std::thread::spawn(thread_print_memory_pool_stats);
+
     info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown.");
     for (idx, handle) in handles.into_iter().enumerate() {
         info!("Waiting for shard thread {} to complete...", idx);
diff --git a/core/server/src/quic/quic_sender.rs 
b/core/server/src/quic/quic_sender.rs
index 54bda604e..30c4a6a7f 100644
--- a/core/server/src/quic/quic_sender.rs
+++ b/core/server/src/quic/quic_sender.rs
@@ -37,7 +37,7 @@ pub struct QuicSender {
 }
 
 impl Sender for QuicSender {
-    async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, 
IggyError>, B) {
+    async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError> {
         //TODO: Fixme
         // Not-so-nice code because quinn recv stream has different API for 
read_exact
         /*
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 43280c777..019358680 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -283,16 +283,6 @@ impl IggyShard {
     }
 
     async fn load_version(&self) -> Result<(), IggyError> {
-        async fn update_system_info(
-            storage: &Rc<SystemStorage>,
-            system_info: &mut SystemInfo,
-            version: &SemanticVersion,
-        ) -> Result<(), IggyError> {
-            system_info.update_version(version);
-            storage.info.save(system_info).await?;
-            Ok(())
-        }
-
         let current_version = &self.version;
         let mut system_info;
         let load_system_info = self.storage.info.load().await;
@@ -301,7 +291,7 @@ impl IggyShard {
             if let IggyError::ResourceNotFound(_) = error {
                 info!("System info not found, creating...");
                 system_info = SystemInfo::default();
-                update_system_info(&self.storage, &mut system_info, 
current_version).await?;
+                Self::update_system_info(&self.storage, &mut system_info, 
current_version).await?;
             } else {
                 return Err(error);
             }
@@ -310,24 +300,34 @@ impl IggyShard {
         }
 
         info!("Loaded {system_info}.");
-        let loaded_version = 
SemanticVersion::from_str(&system_info.version.version)?;
+        let loaded_version = 
SemanticVersion::from_str(&system_info.version.version).unwrap();
         if current_version.is_equal_to(&loaded_version) {
             info!("System version {current_version} is up to date.");
         } else if current_version.is_greater_than(&loaded_version) {
             info!(
                 "System version {current_version} is greater than 
{loaded_version}, checking the available migrations..."
             );
-            update_system_info(&self.storage, &mut system_info, 
current_version).await?;
+            Self::update_system_info(&self.storage, &mut system_info, 
current_version).await?;
         } else {
             info!(
                 "System version {current_version} is lower than 
{loaded_version}, possible downgrade."
             );
-            update_system_info(&self.storage, &mut system_info, 
current_version).await?;
+            Self::update_system_info(&self.storage, &mut system_info, 
current_version).await?;
         }
 
         Ok(())
     }
 
+    async fn update_system_info(
+        storage: &Rc<SystemStorage>,
+        system_info: &mut SystemInfo,
+        version: &SemanticVersion,
+    ) -> Result<(), IggyError> {
+        system_info.update_version(version);
+        storage.info.save(system_info).await?;
+        Ok(())
+    }
+
     async fn load_state(&self) -> Result<SystemState, IggyError> {
         let state_entries = self.state.init().await.with_error_context(|error| 
{
             format!("{COMPONENT} (error: {error}) - failed to initialize state 
entries")
diff --git a/core/server/src/shard/system/storage.rs 
b/core/server/src/shard/system/storage.rs
index 736d8e488..ac4aca3ea 100644
--- a/core/server/src/shard/system/storage.rs
+++ b/core/server/src/shard/system/storage.rs
@@ -65,7 +65,7 @@ impl SystemInfoStorage for FileSystemInfoStorage {
         let file = file::open(&self.path)
             .await
             .map_err(|_| IggyError::CannotReadFile)?;
-        let buffer = PooledBuffer::with_capacity(file_size);
+        let mut buffer = PooledBuffer::with_capacity(file_size);
         let (result, buffer) = file
             .read_exact_at(buffer.slice(0..file_size), 0)
             .await
diff --git a/core/server/src/streaming/segments/direct_file.rs 
b/core/server/src/streaming/segments/direct_file.rs
new file mode 100644
index 000000000..ce1ecb788
--- /dev/null
+++ b/core/server/src/streaming/segments/direct_file.rs
@@ -0,0 +1,443 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::streaming::utils::{ALIGNMENT, PooledBuffer};
+use compio::fs::{File, OpenOptions};
+use compio::io::AsyncWriteAtExt;
+use error_set::ErrContext;
+use iggy_common::IggyError;
+
+#[derive(Debug)]
+pub struct DirectFile {
+    file_path: String,
+    file: File,
+    file_position: u64,
+    tail: PooledBuffer,
+    tail_len: usize,
+}
+
+impl DirectFile {
+    pub async fn open(
+        file_path: &str,
+        initial_position: u64,
+        file_exists: bool,
+    ) -> Result<Self, IggyError> {
+        let mut file = OpenOptions::new()
+            .create(true)
+            .write(true)
+            .custom_flags(0x4000)
+            .open(file_path)
+            .await
+            .with_error_context(|err| {
+                format!("Failed to open file with O_DIRECT: {file_path}, 
error: {err}")
+            })
+            .map_err(|_| IggyError::CannotReadFile)?;
+
+        if !file_exists {
+            let init_buffer = PooledBuffer::with_capacity(ALIGNMENT);
+            let (write_result, _) = file.write_all_at(init_buffer, 
0).await.into();
+            write_result
+                .with_error_context(|error| {
+                    tracing::error!(
+                        "Failed to initialize file with dummy block: 
{file_path}, error: {error}"
+                    );
+                    format!(
+                        "Failed to initialize file with dummy block: 
{file_path}, error: {error}"
+                    )
+                })
+                .map_err(|_| IggyError::CannotWriteToFile)?;
+
+            tracing::trace!("Successfully initialized new file with dummy 
block: {file_path}");
+        }
+
+        tracing::trace!(
+            "Successfully opened DirectFile: {}, position: {}, exists: {}",
+            file_path,
+            initial_position,
+            file_exists
+        );
+
+        Ok(Self {
+            file_path: file_path.to_string(),
+            file,
+            file_position: initial_position,
+            tail: PooledBuffer::with_capacity(ALIGNMENT),
+            tail_len: 0,
+        })
+    }
+
+    pub async fn get_file_size(&self) -> Result<u64, IggyError> {
+        self.file
+            .metadata()
+            .await
+            .with_error_context(|error| {
+                format!(
+                    "Failed to get metadata of file: {}, error: {error}",
+                    self.file_path
+                )
+            })
+            .map_err(|_| IggyError::CannotReadFileMetadata)
+            .map(|metadata| metadata.len())
+    }
+
+    fn new(file: File, file_path: String, initial_position: u64) -> Self {
+        Self {
+            file_path,
+            file,
+            file_position: initial_position,
+            tail: PooledBuffer::with_capacity(ALIGNMENT),
+            tail_len: 0,
+        }
+    }
+
+    pub async fn write_all(&mut self, mut data: &[u8]) -> Result<usize, 
IggyError> {
+        let initial_len = data.len();
+        tracing::trace!(
+            "DirectFile write_all called for file: {}, data_len: {}, position: 
{}, tail_len: {}",
+            self.file_path,
+            initial_len,
+            self.file_position,
+            self.tail_len
+        );
+
+        if self.tail_len > 0 {
+            let need = ALIGNMENT - self.tail_len;
+            let take = need.min(data.len());
+            self.tail.extend_from_slice(&data[..take]);
+            self.tail_len += take;
+            data = &data[take..];
+
+            if self.tail_len == ALIGNMENT {
+                self.flush_tail().await?;
+            }
+        }
+
+        if !data.is_empty() {
+            let whole_sectors_end = data.len() & !(ALIGNMENT - 1);
+            if whole_sectors_end > 0 {
+                let whole_sectors = &data[..whole_sectors_end];
+                let mut written = 0;
+
+                while written < whole_sectors.len() {
+                    let chunk_size = (whole_sectors.len() - written).min(128 * 
1024 * 1024);
+                    let chunk = &whole_sectors[written..written + chunk_size];
+
+                    let chunk_buffer = PooledBuffer::from(chunk);
+
+                    let (result, _) = self
+                        .file
+                        .write_all_at(chunk_buffer, self.file_position)
+                        .await
+                        .into();
+
+                    result.map_err(|e| {
+                        tracing::error!("Failed to write to direct file: {} at 
position {}, chunk size: {}, error: {}",
+                            self.file_path, self.file_position, chunk_size, e);
+                        IggyError::CannotWriteToFile
+                    })?;
+
+                    self.file_position += chunk_size as u64;
+                    written += chunk_size;
+                }
+
+                data = &data[whole_sectors_end..];
+            }
+        }
+
+        if !data.is_empty() {
+            self.tail.clear();
+            self.tail.extend_from_slice(data);
+            self.tail_len = data.len();
+        }
+
+        Ok(initial_len)
+    }
+
+    pub async fn flush(&mut self) -> Result<(), IggyError> {
+        if self.tail_len > 0 {
+            self.tail.resize(ALIGNMENT, 0);
+            self.flush_tail().await?;
+        }
+        Ok(())
+    }
+
+    pub fn position(&self) -> u64 {
+        self.file_position
+    }
+
+    pub fn tail_len(&self) -> usize {
+        self.tail_len
+    }
+
+    pub fn file_path(&self) -> &str {
+        &self.file_path
+    }
+
+    pub fn tail_buffer(&self) -> &PooledBuffer {
+        &self.tail
+    }
+
+    pub fn take_tail(&mut self) -> (PooledBuffer, usize) {
+        let tail = std::mem::replace(&mut self.tail, 
PooledBuffer::with_capacity(ALIGNMENT));
+        let tail_len = self.tail_len;
+        self.tail_len = 0;
+        (tail, tail_len)
+    }
+
+    pub fn set_tail(&mut self, tail: PooledBuffer, tail_len: usize) {
+        self.tail = tail;
+        self.tail_len = tail_len;
+    }
+
+    async fn flush_tail(&mut self) -> Result<(), IggyError> {
+        assert_eq!(self.tail.len(), ALIGNMENT);
+
+        let tail_buffer = std::mem::replace(&mut self.tail, 
PooledBuffer::with_capacity(ALIGNMENT));
+
+        let (result, returned_buf) = self
+            .file
+            .write_all_at(tail_buffer, self.file_position)
+            .await
+            .into();
+
+        result.map_err(|e| {
+            tracing::error!(
+                "Failed to flush tail for file: {} at position {}, tail size: 
{}, error: {}",
+                self.file_path,
+                self.file_position,
+                ALIGNMENT,
+                e
+            );
+            IggyError::CannotWriteToFile
+        })?;
+
+        self.file_position += ALIGNMENT as u64;
+        self.tail_len = 0;
+        self.tail = returned_buf;
+        self.tail.clear();
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::configs::system::SystemConfig;
+    use crate::streaming::utils::MemoryPool;
+    use compio::fs::OpenOptions;
+    use compio::io::{AsyncReadAt, AsyncReadAtExt};
+    use std::sync::Arc;
+    use tempfile::tempdir;
+
+    #[test]
+    fn test_direct_file_small_writes() {
+        compio::runtime::Runtime::new().unwrap().block_on(async {
+            MemoryPool::init_pool(Arc::new(SystemConfig::default()));
+            let temp_dir = tempdir().unwrap();
+            let file_path = temp_dir.path().join("test_direct_io.bin");
+
+            let mut direct_file = 
DirectFile::open(file_path.to_str().unwrap(), 0, false)
+                .await
+                .unwrap();
+
+            for i in 0..10u64 {
+                let buf = i.to_le_bytes();
+                direct_file.write_all(&buf).await.unwrap();
+            }
+
+            direct_file.flush().await.unwrap();
+
+            let file = OpenOptions::new()
+                .read(true)
+                .custom_flags(0x4000)
+                .open(&file_path)
+                .await
+                .unwrap();
+
+            let mut read_buffer = vec![0u8; 512];
+            let (result, buf) = file.read_at(read_buffer, 0).await.into();
+            result.unwrap();
+            read_buffer = buf;
+
+            for i in 0..10u64 {
+                let start = i as usize * 8;
+                let num = u64::from_le_bytes(read_buffer[start..start + 
8].try_into().unwrap());
+                assert_eq!(num, i, "Expected {} at position {}, got {}", i, 
start, num);
+            }
+        });
+    }
+
+    #[test]
+    fn test_direct_file_exact_sector_write() {
+        compio::runtime::Runtime::new().unwrap().block_on(async {
+            MemoryPool::init_pool(Arc::new(SystemConfig::default()));
+            let temp_dir = tempdir().unwrap();
+            let file_path = temp_dir.path().join("test_exact_sector.bin");
+
+            let mut direct_file = 
DirectFile::open(file_path.to_str().unwrap(), 0, false)
+                .await
+                .unwrap();
+
+            let data = vec![42u8; ALIGNMENT];
+            direct_file.write_all(&data).await.unwrap();
+
+            assert_eq!(direct_file.tail_len(), 0);
+            assert_eq!(direct_file.position(), ALIGNMENT as u64);
+
+            let file = OpenOptions::new()
+                .read(true)
+                .custom_flags(0x4000)
+                .open(&file_path)
+                .await
+                .unwrap();
+
+            let mut read_buffer = vec![0u8; ALIGNMENT];
+            let (result, buf) = file.read_at(read_buffer, 0).await.into();
+            result.unwrap();
+            read_buffer = buf;
+
+            assert_eq!(read_buffer, vec![42u8; ALIGNMENT]);
+        });
+    }
+
+    #[test]
+    fn test_direct_file_multiple_sector_writes() {
+        compio::runtime::Runtime::new().unwrap().block_on(async {
+            MemoryPool::init_pool(Arc::new(SystemConfig::default()));
+            let temp_dir = tempdir().unwrap();
+            let file_path = temp_dir.path().join("test_multiple_sectors.bin");
+
+            let mut direct_file = 
DirectFile::open(file_path.to_str().unwrap(), 0, false)
+                .await
+                .unwrap();
+
+            let data1 = vec![1u8; ALIGNMENT * 2];
+            let data2 = vec![2u8; ALIGNMENT * 3];
+            let data3 = vec![3u8; ALIGNMENT];
+
+            direct_file.write_all(&data1).await.unwrap();
+            direct_file.write_all(&data2).await.unwrap();
+            direct_file.write_all(&data3).await.unwrap();
+
+            let file = OpenOptions::new()
+                .read(true)
+                .custom_flags(0x4000)
+                .open(&file_path)
+                .await
+                .unwrap();
+
+            let mut read_buffer = vec![0u8; ALIGNMENT * 6];
+            let (result, buf) = file.read_at(read_buffer, 0).await.into();
+            result.unwrap();
+            read_buffer = buf;
+
+            assert_eq!(&read_buffer[0..ALIGNMENT * 2], &vec![1u8; ALIGNMENT * 
2]);
+            assert_eq!(
+                &read_buffer[ALIGNMENT * 2..ALIGNMENT * 5],
+                &vec![2u8; ALIGNMENT * 3]
+            );
+            assert_eq!(
+                &read_buffer[ALIGNMENT * 5..ALIGNMENT * 6],
+                &vec![3u8; ALIGNMENT]
+            );
+        });
+    }
+
+    #[test]
+    fn test_direct_file_unaligned_write() {
+        compio::runtime::Runtime::new().unwrap().block_on(async {
+            MemoryPool::init_pool(Arc::new(SystemConfig::default()));
+            let temp_dir = tempdir().unwrap();
+            let file_path = temp_dir.path().join("test_unaligned.bin");
+
+            let mut direct_file = 
DirectFile::open(file_path.to_str().unwrap(), 0, false)
+                .await
+                .unwrap();
+
+            let data = vec![77u8; 1000];
+            direct_file.write_all(&data).await.unwrap();
+
+            assert_eq!(direct_file.tail_len(), 1000 % ALIGNMENT);
+            assert_eq!(direct_file.position(), ALIGNMENT as u64);
+
+            direct_file.flush().await.unwrap();
+
+            assert_eq!(direct_file.tail_len(), 0);
+            assert_eq!(direct_file.position(), (ALIGNMENT * 2) as u64);
+
+            let file = OpenOptions::new()
+                .read(true)
+                .custom_flags(0x4000)
+                .open(&file_path)
+                .await
+                .unwrap();
+
+            let mut read_buffer = vec![0u8; ALIGNMENT * 2];
+            let (result, buf) = file.read_at(read_buffer, 0).await.into();
+            result.unwrap();
+            read_buffer = buf;
+
+            assert_eq!(&read_buffer[0..1000], &vec![77u8; 1000]);
+            assert_eq!(
+                &read_buffer[1000..ALIGNMENT * 2],
+                &vec![0u8; ALIGNMENT * 2 - 1000]
+            );
+        });
+    }
+
+    #[test]
+    fn test_direct_file_cross_sector_boundary() {
+        compio::runtime::Runtime::new().unwrap().block_on(async {
+            MemoryPool::init_pool(Arc::new(SystemConfig::default()));
+            let temp_dir = tempdir().unwrap();
+            let file_path = temp_dir.path().join("test_cross_boundary.bin");
+
+            let mut direct_file = 
DirectFile::open(file_path.to_str().unwrap(), 0, false)
+                .await
+                .unwrap();
+
+            let data1 = vec![1u8; 400];
+            let data2 = vec![2u8; 200];
+            let data3 = vec![3u8; 100];
+
+            direct_file.write_all(&data1).await.unwrap();
+            direct_file.write_all(&data2).await.unwrap();
+            direct_file.write_all(&data3).await.unwrap();
+
+            assert_eq!(direct_file.tail_len(), 700 % ALIGNMENT);
+
+            direct_file.flush().await.unwrap();
+
+            let file = OpenOptions::new()
+                .read(true)
+                .custom_flags(0x4000)
+                .open(&file_path)
+                .await
+                .unwrap();
+
+            let mut read_buffer = vec![0u8; ALIGNMENT * 2];
+            let (result, buf) = file.read_at(read_buffer, 0).await.into();
+            result.unwrap();
+            read_buffer = buf;
+
+            assert_eq!(&read_buffer[0..400], &vec![1u8; 400]);
+            assert_eq!(&read_buffer[400..600], &vec![2u8; 200]);
+            assert_eq!(&read_buffer[600..700], &vec![3u8; 100]);
+        });
+    }
+}
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs 
b/core/server/src/streaming/segments/indexes/index_reader.rs
index 48ed12b36..f0a589331 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -339,24 +339,15 @@ impl IndexReader {
         len: u32,
         use_pool: bool,
     ) -> Result<PooledBuffer, std::io::Error> {
-        if use_pool {
-            let len = len as usize;
-            let buf = PooledBuffer::with_capacity(len as usize);
-            let (result, buf) = self
-                .file
-                .read_exact_at(buf.slice(..len), offset as u64)
-                .await
-                .into();
-            let buf = buf.into_inner();
-            result?;
-            Ok(buf)
-        } else {
-            let mut buf = BytesMut::with_capacity(len as usize);
-            unsafe { buf.set_len(len as usize) };
-            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await.into();
-            result?;
-            Ok(PooledBuffer::from_existing(buf))
-        }
+        let mut buf = PooledBuffer::with_capacity(len as usize);
+        let (result, buf) = self
+            .file
+            .read_exact_at(buf.slice(0..len as usize), offset as u64)
+            .await
+            .into();
+        result?;
+
+        Ok(buf.into_inner())
     }
 
     /// Gets the nth index from the index file.
diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs 
b/core/server/src/streaming/segments/indexes/index_writer.rs
index 416cde219..782e633e1 100644
--- a/core/server/src/streaming/segments/indexes/index_writer.rs
+++ b/core/server/src/streaming/segments/indexes/index_writer.rs
@@ -16,25 +16,20 @@
  * under the License.
  */
 
-use compio::fs::File;
-use compio::fs::OpenOptions;
-use compio::io::AsyncWriteAtExt;
+use crate::streaming::segments::DirectFile;
+use crate::streaming::utils::PooledBuffer;
 use error_set::ErrContext;
-use iggy_common::INDEX_SIZE;
-use iggy_common::IggyError;
+use iggy_common::{INDEX_SIZE, IggyError};
 use std::sync::{
     Arc,
     atomic::{AtomicU64, Ordering},
 };
 use tracing::trace;
 
-use crate::streaming::utils::PooledBuffer;
-
 /// A dedicated struct for writing to the index file.
 #[derive(Debug)]
 pub struct IndexWriter {
-    file_path: String,
-    file: File,
+    direct_file: DirectFile,
     index_size_bytes: Arc<AtomicU64>,
     fsync: bool,
 }
@@ -47,39 +42,34 @@ impl IndexWriter {
         fsync: bool,
         file_exists: bool,
     ) -> Result<Self, IggyError> {
-        let file = OpenOptions::new()
-            .create(true)
-            .write(true)
-            .open(file_path)
-            .await
-            .with_error_context(|error| format!("Failed to open index file: 
{file_path}. {error}"))
-            .map_err(|_| IggyError::CannotReadFile)?;
+        let file_position = if file_exists {
+            let current_size = index_size_bytes.load(Ordering::Acquire);
+            (current_size + 511) & !511
+        } else {
+            index_size_bytes.store(0, Ordering::Release);
+            0
+        };
 
-        if file_exists {
-            let _ = file.sync_all().await.with_error_context(|error| {
-                format!("Failed to fsync index file after creation: 
{file_path}. {error}",)
-            });
+        trace!(
+            "Opening index file for writing: {file_path}, file_position: {}",
+            file_position
+        );
 
-            let actual_index_size = file
-                .metadata()
-                .await
-                .with_error_context(|error| {
-                    format!("Failed to get metadata of index file: 
{file_path}. {error}")
-                })
-                .map_err(|_| IggyError::CannotReadFileMetadata)?
-                .len();
+        let mut direct_file = DirectFile::open(file_path, file_position, 
file_exists).await?;
 
+        if file_exists {
+            let actual_index_size = direct_file.get_file_size().await?;
             index_size_bytes.store(actual_index_size, Ordering::Release);
-        }
 
-        trace!(
-            "Opened index file for writing: {file_path}, size: {}",
-            index_size_bytes.load(Ordering::Acquire)
-        );
+            trace!(
+                "Opened existing index file: {file_path}, size: {}, 
file_position: {}",
+                actual_index_size,
+                direct_file.position()
+            );
+        }
 
         Ok(Self {
-            file_path: file_path.to_string(),
-            file,
+            direct_file,
             index_size_bytes,
             fsync,
         })
@@ -92,44 +82,33 @@ impl IndexWriter {
         }
 
         let count = indexes.len() / INDEX_SIZE;
-        let len = indexes.len();
+        let actual_len = indexes.len();
 
-        let position = self.index_size_bytes.load(Ordering::Relaxed);
-        self.file
-            .write_all_at(indexes, position)
+        trace!(
+            "Saving {count} indexes to file: {} (size: {} bytes)",
+            self.direct_file.file_path(),
+            actual_len
+        );
+
+        let bytes_written = self
+            .direct_file
+            .write_all(&indexes)
             .await
-            .0
             .with_error_context(|error| {
                 format!(
                     "Failed to write {} indexes to file: {}. {error}",
-                    count, self.file_path
+                    count,
+                    self.direct_file.file_path()
                 )
             })
             .map_err(|_| IggyError::CannotSaveIndexToSegment)?;
 
+        let new_logical_size = self.index_size_bytes.load(Ordering::Relaxed) + 
bytes_written as u64;
         self.index_size_bytes
-            .fetch_add(len as u64, Ordering::Release);
-
-        if self.fsync {
-            let _ = self.fsync().await;
-        }
-        trace!(
-            "Saved {count} indexes of size {} to file: {}",
-            INDEX_SIZE * count,
-            self.file_path
-        );
+            .store(new_logical_size, Ordering::Release);
 
-        Ok(())
-    }
+        trace!("Saved {count} indexes. Logical size: {}", new_logical_size);
 
-    pub async fn fsync(&self) -> Result<(), IggyError> {
-        self.file
-            .sync_all()
-            .await
-            .with_error_context(|error| {
-                format!("Failed to fsync index file: {}. {error}", 
self.file_path)
-            })
-            .map_err(|_| IggyError::CannotWriteToFile)?;
         Ok(())
     }
 }
diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs 
b/core/server/src/streaming/segments/messages/messages_reader.rs
index bd318fc9c..5ad235022 100644
--- a/core/server/src/streaming/segments/messages/messages_reader.rs
+++ b/core/server/src/streaming/segments/messages/messages_reader.rs
@@ -177,23 +177,13 @@ impl MessagesReader {
         len: u32,
         use_pool: bool,
     ) -> Result<PooledBuffer, std::io::Error> {
-        if use_pool {
-            let mut buf = PooledBuffer::with_capacity(len as usize);
-            let len = len as usize;
-            let (result, buf) = self
-                .file
-                .read_exact_at(buf.slice(..len), offset as u64)
-                .await
-                .into();
-            let buf = buf.into_inner();
-            result?;
-            Ok(buf)
-        } else {
-            let mut buf = BytesMut::with_capacity(len as usize);
-            unsafe { buf.set_len(len as usize) };
-            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await.into();
-            result?;
-            Ok(PooledBuffer::from_existing(buf))
-        }
+        let mut buf = PooledBuffer::with_capacity(len as usize);
+        let (result, buf) = self
+            .file
+            .read_exact_at(buf.slice(0..len as usize), offset as u64)
+            .await
+            .into();
+        result?;
+        Ok(buf.into_inner())
     }
 }
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs 
b/core/server/src/streaming/segments/messages/messages_writer.rs
index f3d43d1e5..ad6a09680 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -16,8 +16,9 @@
  * under the License.
  */
 
-use crate::streaming::segments::{IggyMessagesBatchSet, messages::write_batch};
-use compio::fs::{File, OpenOptions};
+use crate::streaming::segments::{
+    DirectFile, IggyMessagesBatchSet, messages::write_batch_with_direct_file,
+};
 use error_set::ErrContext;
 use iggy_common::{IggyByteSize, IggyError};
 use std::sync::{
@@ -29,68 +30,51 @@ use tracing::{error, trace};
 /// A dedicated struct for writing to the messages file.
 #[derive(Debug)]
 pub struct MessagesWriter {
-    file_path: String,
-    /// Holds the file for synchronous writes; when asynchronous persistence 
is enabled, this will be None.
-    file: Option<File>,
-    /// When set, asynchronous writes are handled by this persister task.
+    direct_file: Option<DirectFile>,
     messages_size_bytes: Arc<AtomicU64>,
     fsync: bool,
 }
 
 impl MessagesWriter {
-    /// Opens the messages file in write mode.
-    ///
-    /// If the server confirmation is set to `NoWait`, the file handle is 
transferred to the
-    /// persister task (and stored in `persister_task`) so that writes are 
done asynchronously.
-    /// Otherwise, the file is retained in `self.file` for synchronous writes.
     pub async fn new(
         file_path: &str,
         messages_size_bytes: Arc<AtomicU64>,
         fsync: bool,
         file_exists: bool,
     ) -> Result<Self, IggyError> {
-        let file = OpenOptions::new()
-            .create(true)
-            .write(true)
-            .open(file_path)
-            .await
-            .with_error_context(|err| {
-                format!("Failed to open messages file: {file_path}, error: 
{err}")
-            })
-            .map_err(|_| IggyError::CannotReadFile)?;
+        let file_position = if file_exists {
+            let current_size = messages_size_bytes.load(Ordering::Acquire);
+            (current_size + 511) & !511
+        } else {
+            messages_size_bytes.store(0, Ordering::Release);
+            0
+        };
 
-        if file_exists {
-            let _ = file.sync_all().await.with_error_context(|error| {
-                format!("Failed to fsync messages file after creation: 
{file_path}, error: {error}")
-            });
+        trace!(
+            "Opening messages file for writing: {file_path}, file_position: 
{}",
+            file_position
+        );
 
-            let actual_messages_size = file
-                .metadata()
-                .await
-                .with_error_context(|error| {
-                    format!("Failed to get metadata of messages file: 
{file_path}, error: {error}")
-                })
-                .map_err(|_| IggyError::CannotReadFileMetadata)?
-                .len();
+        let mut direct_file = DirectFile::open(file_path, file_position, 
file_exists).await?;
 
+        if file_exists {
+            let actual_messages_size = direct_file.get_file_size().await?;
             messages_size_bytes.store(actual_messages_size, Ordering::Release);
-        }
 
-        trace!(
-            "Opened messages file for writing: {file_path}, size: {}",
-            messages_size_bytes.load(Ordering::Acquire)
-        );
+            trace!(
+                "Opened existing messages file: {file_path}, size: {}, 
file_position: {}",
+                actual_messages_size,
+                direct_file.position()
+            );
+        }
 
-        let file = Some(file);
         Ok(Self {
-            file_path: file_path.to_string(),
-            file,
+            direct_file: Some(direct_file),
             messages_size_bytes,
             fsync,
         })
     }
 
-    /// Append a batch of messages to the messages file.
     pub async fn save_batch_set(
         &mut self,
         batch_set: IggyMessagesBatchSet,
@@ -98,49 +82,34 @@ impl MessagesWriter {
         let messages_size = batch_set.size();
         let messages_count = batch_set.count();
         let containers_count = batch_set.containers_count();
-        trace!(
-            "Saving batch set of size {messages_size} bytes 
({containers_count} containers, {messages_count} messages) to messages file: 
{}",
-            self.file_path
-        );
-        let position = self.messages_size_bytes.load(Ordering::Relaxed);
-        if let Some(ref mut file) = self.file {
-            write_batch(file, position, batch_set)
+        let actual_written = if let Some(ref mut direct_file) = 
self.direct_file {
+            trace!(
+                "Saving batch set of size {messages_size} bytes 
({containers_count} containers, {messages_count} messages) to messages file: 
{}",
+                direct_file.file_path()
+            );
+
+            write_batch_with_direct_file(direct_file, batch_set)
                 .await
                 .with_error_context(|error| {
                     format!(
                         "Failed to write batch to messages file: {}. {error}",
-                        self.file_path
+                        direct_file.file_path()
                     )
-                })?;
+                })?
         } else {
-            error!("File handle is not available for synchronous write.");
+            tracing::error!("File handle is not available for synchronous 
write.");
             return Err(IggyError::CannotWriteToFile);
-        }
-
-        if self.fsync {
-            let _ = self.fsync().await;
-        }
+        };
 
+        let logical_size = self.messages_size_bytes.load(Ordering::Relaxed) + 
actual_written as u64;
         self.messages_size_bytes
-            .fetch_add(messages_size as u64, Ordering::Release);
+            .store(logical_size, Ordering::Release);
+
         trace!(
-            "Written batch set of size {messages_size} bytes 
({containers_count} containers, {messages_count} messages) to disk messages 
file: {}",
-            self.file_path
+            "Written batch set of size {messages_size} bytes to disk. Logical 
size: {}",
+            logical_size
         );
 
         Ok(IggyByteSize::from(messages_size as u64))
     }
-
-    pub async fn fsync(&self) -> Result<(), IggyError> {
-        if let Some(file) = self.file.as_ref() {
-            file.sync_all()
-                .await
-                .with_error_context(|error| {
-                    format!("Failed to fsync messages file: {}. {error}", 
self.file_path)
-                })
-                .map_err(|_| IggyError::CannotWriteToFile)?;
-        }
-
-        Ok(())
-    }
 }
diff --git a/core/server/src/streaming/segments/messages/mod.rs 
b/core/server/src/streaming/segments/messages/mod.rs
index a3989a2eb..537ecb78c 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -19,25 +19,27 @@
 mod messages_reader;
 mod messages_writer;
 
-use super::IggyMessagesBatchSet;
-use compio::{fs::File, io::AsyncWriteAtExt};
+use super::{DirectFile, IggyMessagesBatchSet};
+use crate::streaming::utils::PooledBuffer;
 use iggy_common::IggyError;
 
 pub use messages_reader::MessagesReader;
 pub use messages_writer::MessagesWriter;
 
-/// Vectored write a batches of messages to file
-async fn write_batch(
-    file: &mut File,
-    position: u64,
+async fn write_batch_with_direct_file(
+    direct_file: &mut DirectFile,
     mut batches: IggyMessagesBatchSet,
 ) -> Result<usize, IggyError> {
-    let total_written = batches.iter().map(|b| b.size() as usize).sum();
-    let batches = batches
-        .iter_mut()
-        .map(|b| b.take_messages())
-        .collect::<Vec<_>>();
-    let (result, _) = file.write_vectored_all_at(batches, 
position).await.into();
-    result.map_err(|_| IggyError::CannotWriteToFile)?;
+    let total_written: usize = batches.iter().map(|b| b.size() as usize).sum();
+    let mut messages_count = 0;
+
+    for batch in batches.iter_mut() {
+        messages_count += batch.count();
+        let messages = batch.take_messages();
+        direct_file.write_all(&messages).await?;
+    }
+
+    tracing::trace!("Saved {} messages", messages_count);
+
     Ok(total_written)
 }
diff --git a/core/server/src/streaming/segments/mod.rs 
b/core/server/src/streaming/segments/mod.rs
index f3105e547..9fbd4abe6 100644
--- a/core/server/src/streaming/segments/mod.rs
+++ b/core/server/src/streaming/segments/mod.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+mod direct_file;
 mod indexes;
 mod messages;
 mod messages_accumulator;
@@ -24,6 +25,7 @@ mod segment;
 mod types;
 mod writing_messages;
 
+pub use direct_file::DirectFile;
 pub use indexes::IggyIndexesMut;
 pub use messages_accumulator::MessagesAccumulator;
 pub use segment::Segment;
diff --git a/core/server/src/streaming/segments/segment.rs 
b/core/server/src/streaming/segments/segment.rs
index a3ac8588f..85dc3abb8 100644
--- a/core/server/src/streaming/segments/segment.rs
+++ b/core/server/src/streaming/segments/segment.rs
@@ -92,6 +92,7 @@ impl Segment {
             IggyExpiry::ServerDefault => config.segment.message_expiry,
             _ => message_expiry,
         };
+
         Segment {
             stream_id,
             topic_id,
@@ -150,22 +151,16 @@ impl Segment {
             .with_error_context(|error| format!("Failed to load indexes for 
{self}. {error}"))
             .map_err(|_| IggyError::CannotReadFile)?;
 
-        info!(
-            "Loaded {} indexes for segment with start offset: {}, end offset: 
{}, and partition with ID: {}, topic with ID: {}, and stream with ID: {}.",
-            self.indexes.as_ref().map_or(0, |idx| idx.count()),
-            self.start_offset,
-            self.end_offset,
-            self.partition_id,
-            self.topic_id,
-            self.stream_id
-        );
-
         let last_index_offset = if loaded_indexes.is_empty() {
             0_u64
         } else {
             loaded_indexes.last().unwrap().offset() as u64
         };
 
+        if !loaded_indexes.is_empty() {
+            self.indexes = Some(loaded_indexes);
+        }
+
         self.end_offset = self.start_offset + last_index_offset;
 
         info!(
@@ -306,29 +301,8 @@ impl Segment {
     }
 
     pub async fn shutdown_writing(&mut self) {
-        if let Some(log_writer) = self.messages_writer.take() {
-            //TODO: Fixme not sure whether we should spawn a task here.
-            compio::runtime::spawn(async move {
-                let _ = log_writer.fsync().await;
-            })
-            .detach();
-        } else {
-            warn!(
-                "Log writer already closed when calling close() for {}",
-                self
-            );
-        }
-
-        if let Some(index_writer) = self.index_writer.take() {
-            //TODO: Fixme not sure whether we should spawn a task here.
-            compio::runtime::spawn(async move {
-                let _ = index_writer.fsync().await;
-                drop(index_writer)
-            })
-            .detach();
-        } else {
-            warn!("Index writer already closed when calling close()");
-        }
+        let _ = self.messages_writer.take().map(|mut writer| {});
+        let _ = self.index_writer.take().map(|mut writer| {});
     }
 
     pub async fn delete(&mut self) -> Result<(), IggyError> {
diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs 
b/core/server/src/streaming/segments/types/messages_batch_mut.rs
index 764faa112..7152f53d5 100644
--- a/core/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs
@@ -488,7 +488,7 @@ impl IggyMessagesBatchMut {
     /// subsequent messages in the new buffer.
     #[allow(clippy::too_many_arguments)]
     fn rebuild_indexes_for_chunk(
-        new_buffer: &BytesMut,
+        new_buffer: &PooledBuffer,
         new_indexes: &mut IggyIndexesMut,
         offset_in_new_buffer: &mut u32,
         chunk_start: usize,
@@ -571,7 +571,7 @@ impl IggyMessagesBatchMut {
 
         for &(start, end) in &boundaries_to_remove {
             if start > last_pos {
-                let keep_len = start - last_pos;
+                let keep_len: usize = start - last_pos;
                 let chunk = source.split_to(keep_len);
                 let chunk_start_in_new_buffer = new_buffer.len();
                 new_buffer.put(chunk);
@@ -684,7 +684,8 @@ impl IggyMessagesBatchMut {
 
             prev_offset = message.header().offset();
             prev_position = index.position();
-            messages_size += message.size();
+            let msg_size = message.size();
+            messages_size += msg_size;
             messages_count += 1;
         }
 
@@ -822,11 +823,3 @@ impl Index<usize> for IggyMessagesBatchMut {
         &self.messages[start..end]
     }
 }
-
-impl Deref for IggyMessagesBatchMut {
-    type Target = BytesMut;
-
-    fn deref(&self) -> &Self::Target {
-        &self.messages
-    }
-}
diff --git a/core/server/src/streaming/segments/writing_messages.rs 
b/core/server/src/streaming/segments/writing_messages.rs
index e2b0fbb99..0d9b9f123 100644
--- a/core/server/src/streaming/segments/writing_messages.rs
+++ b/core/server/src/streaming/segments/writing_messages.rs
@@ -113,14 +113,20 @@ impl Segment {
         self.indexes.as_mut().unwrap().mark_saved();
 
         if self.config.segment.cache_indexes == CacheIndexesConfig::None {
-            self.indexes.as_mut().unwrap().clear();
+            if let Some(indexes) = self.indexes.as_mut() {
+                indexes.clear();
+            }
         }
 
         self.check_and_handle_segment_full().await?;
 
         trace!(
-            "Saved {} messages on disk in segment with start offset: {} for 
partition with ID: {}, total bytes written: {}.",
-            unsaved_messages_count, self.start_offset, self.partition_id, 
saved_bytes
+            "Saved {} messages on disk in segment with start offset: {}, end 
offset: {}, for partition with ID: {}, total bytes written: {}.",
+            unsaved_messages_count,
+            self.start_offset,
+            self.end_offset,
+            self.partition_id,
+            saved_bytes
         );
 
         Ok(unsaved_messages_count)
diff --git a/core/server/src/streaming/utils/memory_pool.rs 
b/core/server/src/streaming/utils/memory_pool.rs
index 2cc031d3b..df0783b14 100644
--- a/core/server/src/streaming/utils/memory_pool.rs
+++ b/core/server/src/streaming/utils/memory_pool.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::configs::system::SystemConfig;
-use bytes::BytesMut;
+use aligned_vec::{AVec, ConstAlign};
 use crossbeam::queue::ArrayQueue;
 use human_repr::HumanCount;
 use once_cell::sync::OnceCell;
@@ -26,15 +26,18 @@ use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use tracing::{error, info, trace, warn};
 
+pub const ALIGNMENT: usize = 512;
+pub type Align512 = ConstAlign<ALIGNMENT>;
+pub type AlignedBuffer = AVec<u8, Align512>;
+
 /// Global memory pool instance. Use `memory_pool()` to access it.
 pub static MEMORY_POOL: OnceCell<MemoryPool> = OnceCell::new();
 
 /// Total number of distinct bucket sizes.
-const NUM_BUCKETS: usize = 32;
+const NUM_BUCKETS: usize = 31;
 
 /// Array of bucket sizes in ascending order. Each entry is a distinct buffer 
size (in bytes).
 const BUCKET_SIZES: [usize; NUM_BUCKETS] = [
-    256,
     512,
     1024,
     2 * 1024,
@@ -49,8 +52,8 @@ const BUCKET_SIZES: [usize; NUM_BUCKETS] = [
     768 * 1024,
     1024 * 1024,
     1536 * 1024,
-    2 * 1024 * 1024, // Above 2MiB everything should be rounded up to the next 
power of 2 to take advantage of hugepages
-    4 * 1024 * 1024, // (environment variables MIMALLOC_ALLOW_LARGE_OS_PAGES=1 
and MIMALLOC_LARGE_OS_PAGES=1).
+    2 * 1024 * 1024,
+    4 * 1024 * 1024,
     6 * 1024 * 1024,
     8 * 1024 * 1024,
     10 * 1024 * 1024,
@@ -75,7 +78,7 @@ pub fn memory_pool() -> &'static MemoryPool {
         .expect("Memory pool not initialized - MemoryPool::init_pool should be 
called first")
 }
 
-/// A memory pool that maintains fixed-size buckets for reusing `BytesMut` 
buffers.
+/// A memory pool that maintains fixed-size buckets for reusing 
`AlignedBuffer` buffers.
 ///
 /// Each bucket corresponds to a particular size in `BUCKET_SIZES`. The pool 
tracks:
 /// - Buffers currently in use (`in_use`)
@@ -97,7 +100,7 @@ pub struct MemoryPool {
     /// Array of queues for reusable buffers. Each queue can store up to 
`bucket_capacity` buffers.
     /// The length of each queue (`buckets[i].len()`) is how many **free** 
buffers are currently available.
     /// Free doesn't mean the buffer is allocated, it just means it's not in 
use.
-    buckets: [Arc<ArrayQueue<BytesMut>>; NUM_BUCKETS],
+    buckets: [Arc<ArrayQueue<AlignedBuffer>>; NUM_BUCKETS],
 
     /// Number of buffers **in use** for each bucket size (grow/shrink as they 
are acquired/released).
     in_use: [Arc<AtomicUsize>; NUM_BUCKETS],
@@ -163,16 +166,16 @@ impl MemoryPool {
             MEMORY_POOL.get_or_init(|| MemoryPool::new(is_enabled, 
memory_limit, bucket_capacity));
     }
 
-    /// Acquire a `BytesMut` buffer with at least `capacity` bytes.
+    /// Acquire a `AlignedBuffer` buffer with at least `capacity` bytes.
     ///
     /// - If a bucket can fit `capacity`, try to pop from its free buffer 
queue; otherwise create a new buffer.
     /// - If `memory_limit` would be exceeded, allocate outside the pool.
     ///
     /// Returns a tuple of (buffer, was_pool_allocated) where 
was_pool_allocated indicates if the buffer
     /// was allocated from the pool (true) or externally (false).
-    pub fn acquire_buffer(&self, capacity: usize) -> (BytesMut, bool) {
+    pub fn acquire_buffer(&self, capacity: usize) -> (AlignedBuffer, bool) {
         if !self.is_enabled {
-            return (BytesMut::with_capacity(capacity), false);
+            return (AlignedBuffer::with_capacity(ALIGNMENT, capacity), false);
         }
 
         let current = self.pool_current_size();
@@ -193,12 +196,12 @@ impl MemoryPool {
                         new_size, current, self.memory_limit
                     );
                     self.inc_external_allocations();
-                    return (BytesMut::with_capacity(new_size), false);
+                    return (AlignedBuffer::with_capacity(ALIGNMENT, new_size), 
false);
                 }
 
                 self.inc_bucket_alloc(idx);
                 self.inc_bucket_in_use(idx);
-                (BytesMut::with_capacity(new_size), true)
+                (AlignedBuffer::with_capacity(ALIGNMENT, new_size), true)
             }
             None => {
                 if current + capacity > self.memory_limit {
@@ -207,16 +210,16 @@ impl MemoryPool {
                         capacity, current, self.memory_limit
                     );
                     self.inc_external_allocations();
-                    return (BytesMut::with_capacity(capacity), false);
+                    return (AlignedBuffer::with_capacity(ALIGNMENT, capacity), 
false);
                 }
 
                 self.inc_external_allocations();
-                (BytesMut::with_capacity(capacity), false)
+                (AlignedBuffer::with_capacity(ALIGNMENT, capacity), false)
             }
         }
     }
 
-    /// Return a `BytesMut` buffer previously acquired from the pool.
+    /// Return a `AlignedBuffer` buffer previously acquired from the pool.
     ///
     /// - If `current_capacity` differs from `original_capacity`, increments 
`resize_events`.
     /// - If a matching bucket exists, place it back in that bucket's queue 
(if space is available).
@@ -224,7 +227,7 @@ impl MemoryPool {
     /// - The `was_pool_allocated` flag indicates if this buffer was 
originally allocated from the pool.
     pub fn release_buffer(
         &self,
-        buffer: BytesMut,
+        buffer: AlignedBuffer,
         original_capacity: usize,
         was_pool_allocated: bool,
     ) {
@@ -235,10 +238,6 @@ impl MemoryPool {
         let current_capacity = buffer.capacity();
         if current_capacity != original_capacity {
             self.inc_resize_events();
-            trace!(
-                "Buffer capacity {} != original {} when returning",
-                current_capacity, original_capacity
-            );
         }
 
         if was_pool_allocated {
@@ -438,11 +437,11 @@ impl MemoryPool {
 
 /// Return a buffer to the pool by calling `release_buffer` with the original 
capacity.
 /// This extension trait makes it easy to do 
`some_bytes.return_to_pool(orig_cap, was_pool_allocated)`.
-pub trait BytesMutExt {
+pub trait AlignedBufferExt {
     fn return_to_pool(self, original_capacity: usize, was_pool_allocated: 
bool);
 }
 
-impl BytesMutExt for BytesMut {
+impl AlignedBufferExt for AlignedBuffer {
     fn return_to_pool(self, original_capacity: usize, was_pool_allocated: 
bool) {
         memory_pool().release_buffer(self, original_capacity, 
was_pool_allocated);
     }
@@ -667,27 +666,6 @@ mod tests {
             );
         }
 
-        // Test put_bytes
-        {
-            let initial_events = pool.resize_events();
-            let mut buffer = PooledBuffer::with_capacity(4 * 1024);
-            let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap();
-            let orig_in_use = pool.bucket_current_elements(orig_bucket_idx);
-
-            buffer.put_bytes(0, 64 * 1024); // 64KiB of zeros
-
-            assert_eq!(
-                pool.resize_events(),
-                initial_events + 1,
-                "put_bytes should trigger resize event"
-            );
-            assert_eq!(
-                pool.bucket_current_elements(orig_bucket_idx),
-                orig_in_use - 1,
-                "put_bytes should update bucket accounting"
-            );
-        }
-
         // Test extend_from_slice
         {
             let initial_events = pool.resize_events();
diff --git a/core/server/src/streaming/utils/mod.rs 
b/core/server/src/streaming/utils/mod.rs
index e441a051b..d858d684b 100644
--- a/core/server/src/streaming/utils/mod.rs
+++ b/core/server/src/streaming/utils/mod.rs
@@ -25,5 +25,5 @@ pub mod random_id;
 mod memory_pool;
 mod pooled_buffer;
 
-pub use memory_pool::{MemoryPool, memory_pool};
+pub use memory_pool::{ALIGNMENT, MemoryPool, memory_pool};
 pub use pooled_buffer::PooledBuffer;
diff --git a/core/server/src/streaming/utils/pooled_buffer.rs 
b/core/server/src/streaming/utils/pooled_buffer.rs
index ee6f20148..b3769bf27 100644
--- a/core/server/src/streaming/utils/pooled_buffer.rs
+++ b/core/server/src/streaming/utils/pooled_buffer.rs
@@ -16,8 +16,8 @@
  * under the License.
  */
 
-use super::memory_pool::{BytesMutExt, memory_pool};
-use bytes::{Buf, BufMut, BytesMut};
+use super::memory_pool::{AlignedBuffer, AlignedBufferExt, memory_pool};
+use crate::streaming::utils::memory_pool::ALIGNMENT;
 use compio::buf::{IoBuf, IoBufMut, SetBufInit};
 use std::ops::{Deref, DerefMut};
 
@@ -26,7 +26,7 @@ pub struct PooledBuffer {
     from_pool: bool,
     original_capacity: usize,
     original_bucket_idx: Option<usize>,
-    inner: BytesMut,
+    inner: AlignedBuffer,
 }
 
 impl Default for PooledBuffer {
@@ -42,13 +42,14 @@ impl PooledBuffer {
     ///
     /// * `capacity` - The capacity of the buffer
     pub fn with_capacity(capacity: usize) -> Self {
-        let (buffer, was_pool_allocated) = 
memory_pool().acquire_buffer(capacity);
+        let (mut buffer, was_pool_allocated) = 
memory_pool().acquire_buffer(capacity);
         let original_capacity = buffer.capacity();
         let original_bucket_idx = if was_pool_allocated {
             memory_pool().best_fit(original_capacity)
         } else {
             None
         };
+
         Self {
             from_pool: was_pool_allocated,
             original_capacity,
@@ -57,27 +58,13 @@ impl PooledBuffer {
         }
     }
 
-    /// Creates a new pooled buffer from an existing `BytesMut`.
-    ///
-    /// # Arguments
-    ///
-    /// * `existing` - The existing `BytesMut` buffer
-    pub fn from_existing(existing: BytesMut) -> Self {
-        Self {
-            from_pool: false,
-            original_capacity: existing.capacity(),
-            original_bucket_idx: None,
-            inner: existing,
-        }
-    }
-
     /// Creates an empty pooled buffer.
     pub fn empty() -> Self {
         Self {
             from_pool: false,
             original_capacity: 0,
             original_bucket_idx: None,
-            inner: BytesMut::new(),
+            inner: AlignedBuffer::new(ALIGNMENT),
         }
     }
 
@@ -90,6 +77,11 @@ impl PooledBuffer {
 
         let current_capacity = self.inner.capacity();
         if current_capacity != self.original_capacity {
+            tracing::error!(
+                "Pooled buffer resized from {} to {}",
+                self.original_capacity,
+                current_capacity
+            );
             memory_pool().inc_resize_events();
 
             if let Some(orig_idx) = self.original_bucket_idx {
@@ -131,49 +123,111 @@ impl PooledBuffer {
         }
     }
 
-    /// Wrapper for put_bytes which might cause resize
-    pub fn put_bytes(&mut self, byte: u8, len: usize) {
+    /// Wrapper for put_slice which might cause resize
+    pub fn put_slice(&mut self, src: &[u8]) {
         let before_cap = self.inner.capacity();
-        self.inner.put_bytes(byte, len);
+        self.extend_from_slice(src);
 
         if self.inner.capacity() != before_cap {
             self.check_for_resize();
         }
     }
 
-    /// Wrapper for put_slice which might cause resize
-    pub fn put_slice(&mut self, src: &[u8]) {
+    /// Wrapper for put_u32_le which might cause resize
+    pub fn put_u32_le(&mut self, value: u32) {
         let before_cap = self.inner.capacity();
-        self.inner.put_slice(src);
+        self.reserve(4);
+        self.inner.extend_from_slice(&value.to_le_bytes());
 
         if self.inner.capacity() != before_cap {
             self.check_for_resize();
         }
     }
 
-    /// Wrapper for put_u32_le which might cause resize
-    pub fn put_u32_le(&mut self, value: u32) {
+    /// Wrapper for put_u64_le which might cause resize
+    pub fn put_u64_le(&mut self, value: u64) {
         let before_cap = self.inner.capacity();
-        self.inner.put_u32_le(value);
+        self.reserve(8);
+        self.inner.extend_from_slice(&value.to_le_bytes());
 
         if self.inner.capacity() != before_cap {
             self.check_for_resize();
         }
     }
 
-    /// Wrapper for put_u64_le which might cause resize
-    pub fn put_u64_le(&mut self, value: u64) {
+    /// Get a slice of the buffer's contents
+    pub fn as_slice(&self) -> &[u8] {
+        self.inner.as_slice()
+    }
+
+    /// Get the length of the buffer
+    pub fn len(&self) -> usize {
+        self.inner.len()
+    }
+
+    /// Check if the buffer is empty
+    pub fn is_empty(&self) -> bool {
+        self.inner.is_empty()
+    }
+
+    /// Get the capacity of the buffer
+    pub fn capacity(&self) -> usize {
+        self.inner.capacity()
+    }
+
+    /// Clear the buffer
+    pub fn clear(&mut self) {
+        self.inner.clear()
+    }
+
+    /// Resize the buffer
+    pub fn resize(&mut self, new_len: usize, value: u8) {
         let before_cap = self.inner.capacity();
-        self.inner.put_u64_le(value);
+        self.inner.resize(new_len, value);
 
         if self.inner.capacity() != before_cap {
             self.check_for_resize();
         }
     }
+
+    /// Split the buffer at the given index, returning the data before the 
split
+    /// and keeping the data after the split in self
+    pub fn split_to(&mut self, at: usize) -> Vec<u8> {
+        if at > self.inner.len() {
+            panic!("split_to out of bounds");
+        }
+
+        let mut result = Vec::with_capacity(at);
+        result.extend_from_slice(&self.inner[..at]);
+
+        let remaining = self.inner.len() - at;
+        let mut new_data = Vec::with_capacity(remaining);
+        new_data.extend_from_slice(&self.inner[at..]);
+
+        self.inner.clear();
+        self.inner.extend_from_slice(&new_data);
+
+        result
+    }
+
+    /// Put bytes from a slice
+    pub fn put<T: AsRef<[u8]>>(&mut self, data: T) {
+        self.extend_from_slice(data.as_ref());
+    }
+
+    /// Align the buffer length to the next 512-byte boundary by padding with 
zeros
+    pub fn align(&mut self) {
+        let current_len = self.inner.len();
+        let aligned_len = (current_len + 511) & !511;
+        if aligned_len > current_len {
+            let padding = aligned_len - current_len;
+            self.resize(aligned_len, 0);
+        }
+    }
 }
 
 impl Deref for PooledBuffer {
-    type Target = BytesMut;
+    type Target = AlignedBuffer;
 
     fn deref(&self) -> &Self::Target {
         &self.inner
@@ -189,12 +243,18 @@ impl DerefMut for PooledBuffer {
 impl Drop for PooledBuffer {
     fn drop(&mut self) {
         if self.from_pool {
-            let buf = std::mem::take(&mut self.inner);
+            let buf = std::mem::replace(&mut self.inner, 
AlignedBuffer::new(ALIGNMENT));
             buf.return_to_pool(self.original_capacity, true);
         }
     }
 }
 
+impl AsRef<[u8]> for PooledBuffer {
+    fn as_ref(&self) -> &[u8] {
+        self.inner.as_slice()
+    }
+}
+
 impl From<&[u8]> for PooledBuffer {
     fn from(slice: &[u8]) -> Self {
         let mut buf = PooledBuffer::with_capacity(slice.len());
@@ -203,30 +263,10 @@ impl From<&[u8]> for PooledBuffer {
     }
 }
 
-impl Buf for PooledBuffer {
-    fn remaining(&self) -> usize {
-        self.inner.remaining()
-    }
-
-    fn chunk(&self) -> &[u8] {
-        self.inner.chunk()
-    }
-
-    fn advance(&mut self, cnt: usize) {
-        self.inner.advance(cnt)
-    }
-
-    fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> 
usize {
-        self.inner.chunks_vectored(dst)
-    }
-}
-
 impl SetBufInit for PooledBuffer {
     unsafe fn set_buf_init(&mut self, len: usize) {
-        if self.inner.len() <= len {
-            unsafe {
-                self.inner.set_len(len);
-            }
+        unsafe {
+            self.inner.set_len(len);
         }
     }
 }
@@ -239,7 +279,7 @@ unsafe impl IoBufMut for PooledBuffer {
 
 unsafe impl IoBuf for PooledBuffer {
     fn as_buf_ptr(&self) -> *const u8 {
-        self.inner.as_buf_ptr()
+        self.inner.as_ptr()
     }
 
     fn buf_len(&self) -> usize {
diff --git a/core/server/src/tcp/connection_handler.rs 
b/core/server/src/tcp/connection_handler.rs
index 9c6803b8f..1f1bb4827 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -49,7 +49,7 @@ pub(crate) async fn handle_connection(
     loop {
         let read_future = sender.read(length_buffer.clone());
 
-        let (read_length, initial_buffer) = futures::select! {
+        let initial_buffer = futures::select! {
             _ = stop_receiver.recv().fuse() => {
                 info!("Connection stop signal received for session: {}", 
session);
                 let _ = 
sender.send_error_response(IggyError::Disconnected).await;
@@ -57,8 +57,8 @@ pub(crate) async fn handle_connection(
             }
             result = read_future.fuse() => {
                 match result {
-                    (Ok(read_length), initial_buffer) => (read_length, 
initial_buffer),
-                    (Err(error), _) => {
+                    Ok(initial_buffer) => initial_buffer,
+                    Err(error) => {
                         if error.as_code() == 
IggyError::ConnectionClosed.as_code() {
                             return Err(ConnectionError::from(error));
                         } else {
@@ -71,9 +71,9 @@ pub(crate) async fn handle_connection(
             }
         };
 
-        if read_length != INITIAL_BYTES_LENGTH {
+        if initial_buffer.len() != INITIAL_BYTES_LENGTH {
             sender.send_error_response(IggyError::CommandLengthError(format!(
-                "Unable to read the TCP request length, expected: 
{INITIAL_BYTES_LENGTH} bytes, received: {read_length} bytes."
+                "Unable to read the TCP request length, expected: 
{INITIAL_BYTES_LENGTH} bytes, received: {} bytes.", initial_buffer.len()
             ))).await?;
             continue;
         }
@@ -81,8 +81,7 @@ pub(crate) async fn handle_connection(
         let initial_buffer = initial_buffer.freeze();
         let length =
             
u32::from_le_bytes(initial_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap());
-        let (res, code_buffer) = sender.read(code_buffer.clone()).await;
-        let _ = res?;
+        let code_buffer = sender.read(code_buffer.clone()).await?;
         let code_buffer = code_buffer.freeze();
         let code: u32 =
             
u32::from_le_bytes(code_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap());
diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs
index 2c0ecfa9c..f757a69b6 100644
--- a/core/server/src/tcp/sender.rs
+++ b/core/server/src/tcp/sender.rs
@@ -23,31 +23,25 @@ use compio::{
     io::{AsyncRead, AsyncReadAtExt, AsyncReadExt, AsyncWriteExt},
 };
 use iggy_common::IggyError;
-use nix::libc;
-use std::io::IoSlice;
 use tracing::{debug, error};
 
 use crate::streaming::utils::PooledBuffer;
 
 const STATUS_OK: &[u8] = &[0; 4];
 
-pub(crate) async fn read<T, B>(stream: &mut T, buffer: B) -> (Result<usize, 
IggyError>, B)
+pub(crate) async fn read<T, B>(stream: &mut T, buffer: B) -> Result<B, 
IggyError>
 where
     T: AsyncReadExt + AsyncWriteExt + Unpin,
     B: IoBufMut,
 {
     let BufResult(result, buffer) = stream.read_exact(buffer).await;
-    match (result, buffer) {
-        (Ok(_), buffer) => (Ok(buffer.buf_len()), buffer),
-        // TODO: How to handle this ?(Ok(0), buffer) => 
(Err(IggyError::ConnectionClosed), buffer),
-        // `read_exact` from compio doesn't return how many bytes it read.
-        (Err(error), buffer) => {
+    match result {
+        Ok(_) => Ok(buffer),
+        Err(error) => {
             if error.kind() == std::io::ErrorKind::UnexpectedEof {
-                //error!("Got some error tho.. {}", error);
-                (Err(IggyError::ConnectionClosed), buffer)
+                Err(IggyError::ConnectionClosed)
             } else {
-                //error!("Got some other error tho.. {}", error);
-                (Err(IggyError::TcpError), buffer)
+                Err(IggyError::TcpError)
             }
         }
     }
diff --git a/core/server/src/tcp/tcp_sender.rs 
b/core/server/src/tcp/tcp_sender.rs
index d0a1c27c9..ccdc95d24 100644
--- a/core/server/src/tcp/tcp_sender.rs
+++ b/core/server/src/tcp/tcp_sender.rs
@@ -20,13 +20,11 @@ use crate::binary::sender::Sender;
 use crate::streaming::utils::PooledBuffer;
 use crate::tcp::COMPONENT;
 use crate::{server_error::ServerError, tcp::sender};
-use bytes::BytesMut;
-use compio::buf::{IoBuf, IoBufMut};
+use compio::buf::IoBufMut;
 use compio::io::AsyncWrite;
 use compio::net::TcpStream;
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use nix::libc;
 
 #[derive(Debug)]
 pub struct TcpSender {
@@ -34,7 +32,7 @@ pub struct TcpSender {
 }
 
 impl Sender for TcpSender {
-    async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, 
IggyError>, B) {
+    async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError> {
         sender::read(&mut self.stream, buffer).await
     }
 
diff --git a/core/server/src/tcp/tcp_tls_sender.rs 
b/core/server/src/tcp/tcp_tls_sender.rs
index 8dbb3972b..f0dfcdf49 100644
--- a/core/server/src/tcp/tcp_tls_sender.rs
+++ b/core/server/src/tcp/tcp_tls_sender.rs
@@ -35,7 +35,7 @@ pub struct TcpTlsSender {
 }
 
 impl Sender for TcpTlsSender {
-    async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, 
IggyError>, B) {
+    async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError> {
         todo!();
         sender::read(&mut self.stream, buffer).await
     }

Reply via email to