This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch message_bus_impl
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit afdfbb6d837f9d742055333f5c59c784782a0ad0
Author: numminex <[email protected]>
AuthorDate: Mon Dec 1 16:03:30 2025 +0100

    Seomthing
---
 Cargo.lock                                         |   9 +
 Cargo.toml                                         |   2 +
 core/common/Cargo.toml                             |   9 +
 core/common/src/alloc/buffer.rs                    | 285 ++++++++
 core/common/src/alloc/memory_pool.rs               | 737 +++++++++++++++++++++
 core/common/src/alloc/mod.rs                       |  21 +
 core/common/src/lib.rs                             |   3 +
 core/common/src/types/mod.rs                       |   1 +
 core/common/src/types/sender/mod.rs                | 230 +++++++
 core/common/src/types/sender/quic_sender.rs        | 142 ++++
 core/common/src/types/sender/tcp_sender.rs         |  66 ++
 core/common/src/types/sender/tcp_tls_sender.rs     |  87 +++
 core/common/src/types/sender/websocket_sender.rs   | 208 ++++++
 .../src/types/sender/websocket_tls_sender.rs       | 188 ++++++
 core/message_bus/src/lib.rs                        |   5 +-
 core/server/Cargo.toml                             |   4 +-
 core/server/src/configs/defaults.rs                |   3 +-
 core/server/src/configs/system.rs                  |   7 -
 18 files changed, 1995 insertions(+), 12 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index e2eb2ae3e..fc4d14e87 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4764,10 +4764,19 @@ dependencies = [
  "chrono",
  "clap",
  "comfy-table",
+ "compio",
+ "compio-quic",
+ "compio-tls",
+ "compio-ws",
+ "crossbeam",
  "derive_more 2.0.1",
+ "err_trail",
+ "error_set",
  "fast-async-mutex",
  "figment",
+ "human-repr",
  "humantime",
+ "once_cell",
  "rcgen",
  "rustls",
  "serde",
diff --git a/Cargo.toml b/Cargo.toml
index 8254764e3..8f9752608 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -122,6 +122,8 @@ dlopen2 = "0.8.0"
 dotenvy = "0.15.7"
 enum_dispatch = "0.3.13"
 env_logger = "0.11.8"
+err_trail = "0.10.2" 
+error_set = "0.9.0" 
 figlet-rs = "0.1.5"
 figment = { version = "0.10.19", features = ["toml", "env"] }
 flume = "0.11.1"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index f4bf769ec..71b3f5157 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -43,8 +43,11 @@ bytes = { workspace = true }
 chrono = { workspace = true }
 clap = { workspace = true }
 comfy-table = { workspace = true }
+crossbeam = { workspace = true }
 derive_more = { workspace = true }
 fast-async-mutex = { version = "0.6.7", optional = true }
+human-repr = { workspace = true }
+once_cell = { workspace = true }
 figment = { workspace = true }
 humantime = { workspace = true }
 rcgen = "0.14.5"
@@ -59,3 +62,9 @@ toml = { workspace = true }
 tracing = { workspace = true }
 tungstenite = { workspace = true }
 twox-hash = { workspace = true }
+compio = { workspace = true }
+compio-quic = { workspace = true }
+compio-tls = { workspace = true }
+compio-ws = { workspace = true }
+err_trail = { workspace = true }
+error_set = { workspace = true }
diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs
new file mode 100644
index 000000000..f8ff1186b
--- /dev/null
+++ b/core/common/src/alloc/buffer.rs
@@ -0,0 +1,285 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::memory_pool::{BytesMutExt, memory_pool};
+use bytes::{Buf, BufMut, BytesMut};
+use compio::buf::{IoBuf, IoBufMut, SetBufInit};
+use std::ops::{Deref, DerefMut};
+
+/// A buffer wrapper that participates in memory pooling.
+///
+/// This buffer automatically acquires memory from the global memory pool
+/// and returns it when dropped. It also tracks resize events to keep
+/// pool accounting accurate.
+#[derive(Debug)]
+pub struct PooledBuffer {
+    from_pool: bool,
+    original_capacity: usize,
+    original_bucket_idx: Option<usize>,
+    inner: BytesMut,
+}
+
+impl Default for PooledBuffer {
+    fn default() -> Self {
+        Self::empty()
+    }
+}
+
+impl PooledBuffer {
+    /// Creates a new pooled buffer with the specified capacity.
+    ///
+    /// # Arguments
+    ///
+    /// * `capacity` - The capacity of the buffer
+    pub fn with_capacity(capacity: usize) -> Self {
+        let (buffer, was_pool_allocated) = 
memory_pool().acquire_buffer(capacity);
+        let original_capacity = buffer.capacity();
+        let original_bucket_idx = if was_pool_allocated {
+            memory_pool().best_fit(original_capacity)
+        } else {
+            None
+        };
+        Self {
+            from_pool: was_pool_allocated,
+            original_capacity,
+            original_bucket_idx,
+            inner: buffer,
+        }
+    }
+
+    /// Creates a new pooled buffer from an existing `BytesMut`.
+    ///
+    /// # Arguments
+    ///
+    /// * `existing` - The existing `BytesMut` buffer
+    pub fn from_existing(existing: BytesMut) -> Self {
+        Self {
+            from_pool: false,
+            original_capacity: existing.capacity(),
+            original_bucket_idx: None,
+            inner: existing,
+        }
+    }
+
+    /// Creates an empty pooled buffer.
+    pub fn empty() -> Self {
+        Self {
+            from_pool: false,
+            original_capacity: 0,
+            original_bucket_idx: None,
+            inner: BytesMut::new(),
+        }
+    }
+
+    /// Checks if the buffer needs to be resized and updates the memory pool 
accordingly.
+    /// This shall be called after operations that might cause a resize.
+    pub fn check_for_resize(&mut self) {
+        if !self.from_pool {
+            return;
+        }
+
+        let current_capacity = self.inner.capacity();
+        if current_capacity != self.original_capacity {
+            memory_pool().inc_resize_events();
+
+            if let Some(orig_idx) = self.original_bucket_idx {
+                memory_pool().dec_bucket_in_use(orig_idx);
+
+                if let Some(new_idx) = 
memory_pool().best_fit(current_capacity) {
+                    // Track as a new allocation in the new bucket
+                    memory_pool().inc_bucket_alloc(new_idx);
+                    memory_pool().inc_bucket_in_use(new_idx);
+                    self.original_bucket_idx = Some(new_idx);
+                } else {
+                    // Track as an external allocation if no bucket fits
+                    memory_pool().inc_external_allocations();
+                    self.original_bucket_idx = None;
+                }
+            }
+
+            self.original_capacity = current_capacity;
+        }
+    }
+
+    /// Wrapper for reserve which might cause resize
+    pub fn reserve(&mut self, additional: usize) {
+        let before_cap = self.inner.capacity();
+        self.inner.reserve(additional);
+
+        if self.inner.capacity() != before_cap {
+            self.check_for_resize();
+        }
+    }
+
+    /// Wrapper for extend_from_slice which might cause resize
+    pub fn extend_from_slice(&mut self, extend_from: &[u8]) {
+        let before_cap = self.inner.capacity();
+        self.inner.extend_from_slice(extend_from);
+
+        if self.inner.capacity() != before_cap {
+            self.check_for_resize();
+        }
+    }
+
+    /// Wrapper for put_bytes which might cause resize
+    pub fn put_bytes(&mut self, byte: u8, len: usize) {
+        let before_cap = self.inner.capacity();
+        self.inner.put_bytes(byte, len);
+
+        if self.inner.capacity() != before_cap {
+            self.check_for_resize();
+        }
+    }
+
+    /// Wrapper for put_slice which might cause resize
+    pub fn put_slice(&mut self, src: &[u8]) {
+        let before_cap = self.inner.capacity();
+        self.inner.put_slice(src);
+
+        if self.inner.capacity() != before_cap {
+            self.check_for_resize();
+        }
+    }
+
+    /// Wrapper for put_u32_le which might cause resize
+    pub fn put_u32_le(&mut self, value: u32) {
+        let before_cap = self.inner.capacity();
+        self.inner.put_u32_le(value);
+
+        if self.inner.capacity() != before_cap {
+            self.check_for_resize();
+        }
+    }
+
+    /// Wrapper for put_u64_le which might cause resize
+    pub fn put_u64_le(&mut self, value: u64) {
+        let before_cap = self.inner.capacity();
+        self.inner.put_u64_le(value);
+
+        if self.inner.capacity() != before_cap {
+            self.check_for_resize();
+        }
+    }
+
+    /// Returns the capacity of the inner buffer
+    pub fn capacity(&self) -> usize {
+        self.inner.capacity()
+    }
+
+    /// Returns the length of the inner buffer
+    pub fn len(&self) -> usize {
+        self.inner.len()
+    }
+
+    /// Returns true if the buffer is empty
+    pub fn is_empty(&self) -> bool {
+        self.inner.is_empty()
+    }
+
+    /// Consumes the PooledBuffer and returns the inner BytesMut.
+    /// Note: This bypasses pool return logic, use with caution.
+    pub fn into_inner(self) -> BytesMut {
+        let mut this = std::mem::ManuallyDrop::new(self);
+        std::mem::take(&mut this.inner)
+    }
+}
+
+impl Deref for PooledBuffer {
+    type Target = BytesMut;
+
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+impl DerefMut for PooledBuffer {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.inner
+    }
+}
+
+impl Drop for PooledBuffer {
+    fn drop(&mut self) {
+        if self.from_pool {
+            let buf = std::mem::take(&mut self.inner);
+            buf.return_to_pool(self.original_capacity, true);
+        }
+    }
+}
+
+impl From<&[u8]> for PooledBuffer {
+    fn from(slice: &[u8]) -> Self {
+        let mut buf = PooledBuffer::with_capacity(slice.len());
+        buf.inner.extend_from_slice(slice);
+        buf
+    }
+}
+
+impl From<BytesMut> for PooledBuffer {
+    fn from(bytes: BytesMut) -> Self {
+        Self::from_existing(bytes)
+    }
+}
+
+impl Buf for PooledBuffer {
+    fn remaining(&self) -> usize {
+        self.inner.remaining()
+    }
+
+    fn chunk(&self) -> &[u8] {
+        self.inner.chunk()
+    }
+
+    fn advance(&mut self, cnt: usize) {
+        self.inner.advance(cnt)
+    }
+
+    fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> 
usize {
+        self.inner.chunks_vectored(dst)
+    }
+}
+
+impl SetBufInit for PooledBuffer {
+    unsafe fn set_buf_init(&mut self, len: usize) {
+        if self.inner.len() <= len {
+            unsafe {
+                self.inner.set_len(len);
+            }
+        }
+    }
+}
+
+unsafe impl IoBufMut for PooledBuffer {
+    fn as_buf_mut_ptr(&mut self) -> *mut u8 {
+        self.inner.as_mut_ptr()
+    }
+}
+
+unsafe impl IoBuf for PooledBuffer {
+    fn as_buf_ptr(&self) -> *const u8 {
+        self.inner.as_buf_ptr()
+    }
+
+    fn buf_len(&self) -> usize {
+        self.inner.len()
+    }
+
+    fn buf_capacity(&self) -> usize {
+        self.inner.capacity()
+    }
+}
\ No newline at end of file
diff --git a/core/common/src/alloc/memory_pool.rs 
b/core/common/src/alloc/memory_pool.rs
new file mode 100644
index 000000000..b696a8b6b
--- /dev/null
+++ b/core/common/src/alloc/memory_pool.rs
@@ -0,0 +1,737 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bytes::BytesMut;
+use crossbeam::queue::ArrayQueue;
+use human_repr::HumanCount;
+use once_cell::sync::OnceCell;
+use serde::{Deserialize, Serialize};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use tracing::{info, trace, warn};
+
+/// Global memory pool instance. Use `memory_pool()` to access it.
+pub static MEMORY_POOL: OnceCell<MemoryPool> = OnceCell::new();
+
+/// Total number of distinct bucket sizes.
+const NUM_BUCKETS: usize = 32;
+
+/// Array of bucket sizes in ascending order. Each entry is a distinct buffer 
size (in bytes).
+const BUCKET_SIZES: [usize; NUM_BUCKETS] = [
+    256,
+    512,
+    1024,
+    2 * 1024,
+    4 * 1024,
+    8 * 1024,
+    16 * 1024,
+    32 * 1024,
+    64 * 1024,
+    128 * 1024,
+    256 * 1024,
+    512 * 1024,
+    768 * 1024,
+    1024 * 1024,
+    1536 * 1024,
+    2 * 1024 * 1024, // Above 2MiB everything should be rounded up to the next 
power of 2 to take advantage of hugepages
+    4 * 1024 * 1024, // (environment variables MIMALLOC_ALLOW_LARGE_OS_PAGES=1 
and MIMALLOC_LARGE_OS_PAGES=1).
+    6 * 1024 * 1024,
+    8 * 1024 * 1024,
+    10 * 1024 * 1024,
+    12 * 1024 * 1024,
+    16 * 1024 * 1024,
+    24 * 1024 * 1024,
+    32 * 1024 * 1024,
+    48 * 1024 * 1024,
+    64 * 1024 * 1024,
+    96 * 1024 * 1024,
+    128 * 1024 * 1024,
+    192 * 1024 * 1024,
+    256 * 1024 * 1024,
+    384 * 1024 * 1024,
+    512 * 1024 * 1024,
+];
+
+/// Retrieve the global `MemoryPool` instance. Panics if not yet initialized.
+pub fn memory_pool() -> &'static MemoryPool {
+    MEMORY_POOL
+        .get()
+        .expect("Memory pool not initialized - MemoryPool::init_pool should be 
called first")
+}
+
+/// Configuration for the memory pool.
+#[derive(Deserialize, Serialize, Debug)]
+pub struct MemoryPoolConfig {
+    /// Whether the pool is enabled.
+    pub enabled: bool,
+    /// Maximum size of the pool.
+    pub size: crate::IggyByteSize,
+    /// Maximum number of buffers per bucket.
+    pub bucket_capacity: u32,
+}
+
+/// A memory pool that maintains fixed-size buckets for reusing `BytesMut` 
buffers.
+///
+/// Each bucket corresponds to a particular size in `BUCKET_SIZES`. The pool 
tracks:
+/// - Buffers currently in use (`in_use`)
+/// - Buffers allocated historically (`allocations`)
+/// - Buffers returned to the pool (`returned`)
+/// - External allocations/deallocations (buffers allocated outside the pool 
limit)
+/// - Resizes and dropped returns
+#[derive(Clone)]
+pub struct MemoryPool {
+    /// Whether the pool is enabled.
+    pub is_enabled: bool,
+
+    /// Configured maximum bytes for which the pool is responsible.
+    pub memory_limit: usize,
+
+    /// Configured maximum number of buffers in each bucket.
+    pub bucket_capacity: usize,
+
+    /// Array of queues for reusable buffers. Each queue can store up to 
`bucket_capacity` buffers.
+    /// The length of each queue (`buckets[i].len()`) is how many **free** 
buffers are currently available.
+    /// Free doesn't mean the buffer is allocated, it just means it's not in 
use.
+    buckets: [Arc<ArrayQueue<BytesMut>>; NUM_BUCKETS],
+
+    /// Number of buffers **in use** for each bucket size (grow/shrink as they 
are acquired/released).
+    in_use: [Arc<AtomicUsize>; NUM_BUCKETS],
+
+    /// Total number of buffers **ever allocated** in each bucket. 
Monotonically increasing.
+    allocations: [Arc<AtomicUsize>; NUM_BUCKETS],
+
+    /// Total number of buffers **ever returned** to each bucket. 
Monotonically increasing.
+    returned: [Arc<AtomicUsize>; NUM_BUCKETS],
+
+    /// Count of buffers allocated outside the pool limit (e.g., if usage 
exceeds `memory_limit` or no matching bucket).
+    external_allocations: Arc<AtomicUsize>,
+
+    /// Count of buffers deallocated outside the pool (e.g., returning a 
buffer that doesn't match any bucket).
+    external_deallocations: Arc<AtomicUsize>,
+
+    /// Number of resize events detected (the buffer capacity changed between 
acquire and release).
+    resize_events: Arc<AtomicUsize>,
+
+    /// Number of returns that couldn't be stored because the relevant bucket 
queue was at capacity.
+    dropped_returns: Arc<AtomicUsize>,
+
+    /// Flag set when the pool usage first exceeds `memory_limit`. Used to log 
a single warning.
+    capacity_warning: Arc<AtomicBool>,
+}
+
+impl MemoryPool {
+    /// Create a new memory pool. Usually called from `init_pool` below.
+    pub fn new(is_enabled: bool, memory_limit: usize, bucket_capacity: usize) 
-> Self {
+        let buckets = [0; NUM_BUCKETS].map(|_| 
Arc::new(ArrayQueue::new(bucket_capacity)));
+
+        if is_enabled {
+            info!(
+                "Initializing MemoryPool with {NUM_BUCKETS} buckets, each will 
have capacity: {bucket_capacity}."
+            );
+        } else {
+            info!("MemoryPool is disabled.");
+        }
+
+        Self {
+            is_enabled,
+            memory_limit,
+            bucket_capacity,
+            buckets,
+            in_use: [0; NUM_BUCKETS].map(|_| Arc::new(AtomicUsize::new(0))),
+            allocations: [0; NUM_BUCKETS].map(|_| 
Arc::new(AtomicUsize::new(0))),
+            returned: [0; NUM_BUCKETS].map(|_| Arc::new(AtomicUsize::new(0))),
+            external_allocations: Arc::new(AtomicUsize::new(0)),
+            external_deallocations: Arc::new(AtomicUsize::new(0)),
+            resize_events: Arc::new(AtomicUsize::new(0)),
+            dropped_returns: Arc::new(AtomicUsize::new(0)),
+            capacity_warning: Arc::new(AtomicBool::new(false)),
+        }
+    }
+
+    /// Initialize the global pool from the given config.
+    pub fn init_pool(config: &MemoryPoolConfig) {
+        let is_enabled = config.enabled;
+        let memory_limit = config.size.as_bytes_usize();
+        let bucket_capacity = config.bucket_capacity as usize;
+
+        let _ =
+            MEMORY_POOL.get_or_init(|| MemoryPool::new(is_enabled, 
memory_limit, bucket_capacity));
+    }
+
+    /// Acquire a `BytesMut` buffer with at least `capacity` bytes.
+    ///
+    /// - If a bucket can fit `capacity`, try to pop from its free buffer 
queue; otherwise create a new buffer.
+    /// - If `memory_limit` would be exceeded, allocate outside the pool.
+    ///
+    /// Returns a tuple of (buffer, was_pool_allocated) where 
was_pool_allocated indicates if the buffer
+    /// was allocated from the pool (true) or externally (false).
+    pub fn acquire_buffer(&self, capacity: usize) -> (BytesMut, bool) {
+        if !self.is_enabled {
+            return (BytesMut::with_capacity(capacity), false);
+        }
+
+        let current = self.pool_current_size();
+
+        match self.best_fit(capacity) {
+            Some(idx) => {
+                if let Some(mut buf) = self.buckets[idx].pop() {
+                    buf.clear();
+                    self.inc_bucket_in_use(idx);
+                    return (buf, true);
+                }
+
+                let new_size = BUCKET_SIZES[idx];
+                if current + new_size > self.memory_limit {
+                    self.set_capacity_warning(true);
+                    trace!(
+                        "Pool is at capacity. Allocating outside the pool: 
requested {} B, current usage {} B, limit {} B",
+                        new_size, current, self.memory_limit
+                    );
+                    self.inc_external_allocations();
+                    return (BytesMut::with_capacity(new_size), false);
+                }
+
+                self.inc_bucket_alloc(idx);
+                self.inc_bucket_in_use(idx);
+                (BytesMut::with_capacity(new_size), true)
+            }
+            None => {
+                if current + capacity > self.memory_limit {
+                    trace!(
+                        "Pool is at capacity. Allocating outside the pool: 
requested {} B, current usage {} B, limit {} B",
+                        capacity, current, self.memory_limit
+                    );
+                    self.inc_external_allocations();
+                    return (BytesMut::with_capacity(capacity), false);
+                }
+
+                self.inc_external_allocations();
+                (BytesMut::with_capacity(capacity), false)
+            }
+        }
+    }
+
+    /// Return a `BytesMut` buffer previously acquired from the pool.
+    ///
+    /// - If `current_capacity` differs from `original_capacity`, increments 
`resize_events`.
+    /// - If a matching bucket exists, place it back in that bucket's queue 
(if space is available).
+    /// - Otherwise, treat it as an external deallocation.
+    /// - The `was_pool_allocated` flag indicates if this buffer was 
originally allocated from the pool.
+    pub fn release_buffer(
+        &self,
+        buffer: BytesMut,
+        original_capacity: usize,
+        was_pool_allocated: bool,
+    ) {
+        if !self.is_enabled {
+            return;
+        }
+
+        let current_capacity = buffer.capacity();
+        if current_capacity != original_capacity {
+            self.inc_resize_events();
+            trace!(
+                "Buffer capacity {} != original {} when returning",
+                current_capacity, original_capacity
+            );
+        }
+
+        if was_pool_allocated && let Some(orig_idx) = 
self.best_fit(original_capacity) {
+            self.dec_bucket_in_use(orig_idx);
+        }
+
+        match self.best_fit(current_capacity) {
+            Some(idx) => {
+                self.inc_bucket_return(idx);
+
+                if self.buckets[idx].push(buffer).is_err() {
+                    self.inc_dropped_returns();
+                    trace!(
+                        "Pool full for size: {} B, dropping buffer",
+                        BUCKET_SIZES[idx]
+                    );
+                    self.set_capacity_warning(true);
+                }
+            }
+            None => {
+                self.inc_external_deallocations();
+                trace!("Returned outside-of-pool buffer, capacity: 
{current_capacity}, dropping");
+            }
+        }
+    }
+
+    /// Write a log message summarizing current usage, allocations, etc.
+    /// Only logs if the pool is enabled and `pool_current_size()` is non-zero.
+    /// Also logs a warning if the pool usage has exceeded `memory_limit`.
+    pub fn log_stats(&self) {
+        if !self.is_enabled || self.pool_current_size() == 0 {
+            return;
+        }
+
+        let bucket_stats = (0..NUM_BUCKETS)
+            .filter_map(|i| {
+                let current_el = self.bucket_current_elements(i);
+                let allocated_el = self.bucket_allocated_elements(i);
+
+                if current_el > 0 || allocated_el > 0 {
+                    Some(format!(
+                        
"{label}:[{current_el}/{current_size}/{allocated_el}/{allocated_size}/{returns}]",
+                        label = size_str(BUCKET_SIZES[i]),
+                        current_el = current_el.human_count_bare(),
+                        current_size = size_str(self.bucket_current_size(i)),
+                        allocated_el = allocated_el.human_count_bare(),
+                        allocated_size = 
size_str(self.bucket_allocated_size(i)),
+                        returns = self.bucket_returns(i).human_count_bare(),
+                    ))
+                } else {
+                    None
+                }
+            })
+            .collect::<Vec<String>>()
+            .join(", ");
+
+        info!(
+            "Pool Buckets: {bucket_stats} 
(BucketLabel:[InUseCount/InUseSize/AllocCount/AllocSize/Returns])"
+        );
+        info!(
+            "Pool Summary: 
Curr:{current}/Alloc:{allocated}/Util:{util:.1}%/Limit:{limit}/ExtAlloc:{ext_alloc}/ExtDealloc:{ext_dealloc}/DropRet:{drop_ret}/Resizes:{resize_events}/BucketCap:{cap}",
+            current = size_str(self.pool_current_size()),
+            allocated = size_str(self.pool_allocated_size()),
+            util = self.pool_utilization(),
+            limit = size_str(self.pool_maximum_size()),
+            ext_alloc = self.external_allocations(),
+            ext_dealloc = self.external_deallocations(),
+            drop_ret = self.dropped_returns(),
+            resize_events = self.resize_events(),
+            cap = self.bucket_capacity,
+        );
+
+        if self.should_print_warning() {
+            warn!("Memory pool usage exceeded limit! Consider adjusting 
memory_pool.size.");
+            self.set_capacity_warning(false);
+        }
+    }
+
+    #[inline]
+    pub fn best_fit(&self, capacity: usize) -> Option<usize> {
+        match BUCKET_SIZES.binary_search(&capacity) {
+            Ok(idx) => Some(idx),
+            Err(idx) => {
+                if idx < NUM_BUCKETS {
+                    Some(idx)
+                } else {
+                    None
+                }
+            }
+        }
+    }
+
+    /// Returns the configured maximum size of the pool, usually from config, 
in bytes.
+    fn pool_maximum_size(&self) -> usize {
+        self.memory_limit
+    }
+
+    /// Sums the sizes (in bytes) of all buffers currently in use across the 
pool.
+    fn pool_current_size(&self) -> usize {
+        (0..NUM_BUCKETS)
+            .filter(|&i| self.bucket_current_elements(i) > 0)
+            .map(|i| self.bucket_current_size(i))
+            .sum()
+    }
+    /// Sums the sizes (in bytes) of all buffers ever allocated across the 
pool (historical).
+    fn pool_allocated_size(&self) -> usize {
+        let mut size = 0;
+        for i in 0..NUM_BUCKETS {
+            size += self.bucket_allocated_size(i);
+        }
+        size
+    }
+
+    /// Returns pool utilization percentage.
+    fn pool_utilization(&self) -> f64 {
+        (self.pool_current_size() as f64 / self.pool_maximum_size() as f64) * 
100.0
+    }
+
+    fn bucket_current_elements(&self, idx: usize) -> usize {
+        self.in_use[idx].load(Ordering::Acquire)
+    }
+
+    fn bucket_current_size(&self, idx: usize) -> usize {
+        self.bucket_current_elements(idx) * BUCKET_SIZES[idx]
+    }
+
+    fn bucket_allocated_elements(&self, idx: usize) -> usize {
+        self.allocations[idx].load(Ordering::Acquire)
+    }
+
+    fn bucket_allocated_size(&self, idx: usize) -> usize {
+        self.bucket_allocated_elements(idx) * BUCKET_SIZES[idx]
+    }
+
+    fn bucket_returns(&self, idx: usize) -> usize {
+        self.returned[idx].load(Ordering::Acquire)
+    }
+
+    fn resize_events(&self) -> usize {
+        self.resize_events.load(Ordering::Acquire)
+    }
+
+    fn dropped_returns(&self) -> usize {
+        self.dropped_returns.load(Ordering::Acquire)
+    }
+
+    fn external_allocations(&self) -> usize {
+        self.external_allocations.load(Ordering::Acquire)
+    }
+
+    fn external_deallocations(&self) -> usize {
+        self.external_deallocations.load(Ordering::Acquire)
+    }
+
+    pub(super) fn inc_resize_events(&self) {
+        self.resize_events.fetch_add(1, Ordering::Release);
+    }
+
+    pub(super) fn inc_dropped_returns(&self) {
+        self.dropped_returns.fetch_add(1, Ordering::Release);
+    }
+
+    pub(super) fn inc_external_allocations(&self) {
+        self.external_allocations.fetch_add(1, Ordering::Release);
+    }
+
+    pub(super) fn inc_external_deallocations(&self) {
+        self.external_deallocations.fetch_add(1, Ordering::Release);
+    }
+
+    pub(super) fn inc_bucket_alloc(&self, idx: usize) {
+        self.allocations[idx].fetch_add(1, Ordering::Release);
+    }
+
+    pub(super) fn inc_bucket_return(&self, idx: usize) {
+        self.returned[idx].fetch_add(1, Ordering::Release);
+    }
+
+    pub(super) fn inc_bucket_in_use(&self, idx: usize) {
+        self.in_use[idx].fetch_add(1, Ordering::Release);
+    }
+
+    pub(super) fn dec_bucket_in_use(&self, idx: usize) {
+        self.in_use[idx].fetch_sub(1, Ordering::Release);
+    }
+
+    fn should_print_warning(&self) -> bool {
+        self.capacity_warning.load(Ordering::Acquire)
+    }
+
+    fn set_capacity_warning(&self, value: bool) {
+        self.capacity_warning.store(value, Ordering::Release);
+    }
+}
+
+/// Return a buffer to the pool by calling `release_buffer` with the original 
capacity.
+/// This extension trait makes it easy to do 
`some_bytes.return_to_pool(orig_cap, was_pool_allocated)`.
+pub trait BytesMutExt {
+    fn return_to_pool(self, original_capacity: usize, was_pool_allocated: 
bool);
+}
+
+impl BytesMutExt for BytesMut {
+    fn return_to_pool(self, original_capacity: usize, was_pool_allocated: 
bool) {
+        memory_pool().release_buffer(self, original_capacity, 
was_pool_allocated);
+    }
+}
+
+/// Convert a size in bytes to a string like "8KiB" or "2MiB".
+fn size_str(size: usize) -> String {
+    if size >= 1024 * 1024 {
+        format!("{}MiB", size / (1024 * 1024))
+    } else if size >= 1024 {
+        format!("{}KiB", size / 1024)
+    } else {
+        format!("{size}B")
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::{IggyByteSize, alloc::buffer::PooledBuffer};
+
+    use super::*;
+    use std::{str::FromStr, sync::Once};
+
+    static TEST_INIT: Once = Once::new();
+
+    fn initialize_pool_for_tests() {
+        TEST_INIT.call_once(|| {
+            let config = 
+                MemoryPoolConfig {
+                    enabled: true,
+                    size: IggyByteSize::from_str("4GiB").unwrap(),
+                    bucket_capacity: 8192,
+                };
+            MemoryPool::init_pool(&config);
+        });
+    }
+
+    #[test]
+    fn test_pooled_buffer_resize_tracking() {
+        initialize_pool_for_tests();
+
+        let pool = memory_pool();
+        let initial_resize_events = pool.resize_events();
+
+        let small_bucket_idx = 2;
+        let small_bucket_size = BUCKET_SIZES[small_bucket_idx];
+
+        let mut buffer = PooledBuffer::with_capacity(small_bucket_size);
+
+        assert_eq!(
+            buffer.capacity(),
+            small_bucket_size,
+            "Initial capacity should match requested size"
+        );
+
+        let original_in_use_before = 
pool.bucket_current_elements(small_bucket_idx);
+
+        // Force a resize with data significantly larger than current bucket
+        // Use 256KiB data to ensure it goes to a much larger bucket
+        let large_bucket_idx = 6;
+        let large_data_size = BUCKET_SIZES[large_bucket_idx] - 1024;
+        buffer.put_slice(&vec![0u8; large_data_size]);
+
+        assert!(
+            buffer.capacity() >= large_data_size,
+            "Buffer should resize to fit data"
+        );
+
+        assert_eq!(
+            pool.resize_events(),
+            initial_resize_events + 1,
+            "Resize event should be recorded"
+        );
+
+        assert_eq!(
+            pool.bucket_current_elements(small_bucket_idx),
+            original_in_use_before - 1,
+            "Original bucket in-use count should decrease by 1"
+        );
+
+        let new_bucket_idx = pool.best_fit(buffer.capacity()).unwrap();
+        assert!(
+            new_bucket_idx > small_bucket_idx,
+            "Buffer should move to a larger bucket"
+        );
+
+        let new_bucket_in_use = pool.bucket_current_elements(new_bucket_idx);
+        assert!(
+            new_bucket_in_use > 0,
+            "New bucket should have buffers in use"
+        );
+
+        drop(buffer);
+
+        assert_eq!(
+            pool.bucket_current_elements(small_bucket_idx),
+            original_in_use_before - 1,
+            "Small bucket count should remain decreased after drop"
+        );
+
+        assert_eq!(
+            pool.bucket_current_elements(new_bucket_idx),
+            new_bucket_in_use - 1,
+            "New bucket in-use count should decrease after drop"
+        );
+    }
+
+    #[test]
+    fn test_multiple_resize_operations() {
+        initialize_pool_for_tests();
+
+        let pool = memory_pool();
+        let initial_resize_events = pool.resize_events();
+
+        let mut buffer = PooledBuffer::with_capacity(BUCKET_SIZES[1]); // 8KiB
+        let original_bucket_idx = pool.best_fit(buffer.capacity()).unwrap();
+        let original_bucket_in_use = 
pool.bucket_current_elements(original_bucket_idx);
+
+        let first_resize_size = BUCKET_SIZES[4]; // 64KiB
+        buffer.put_slice(&vec![0u8; first_resize_size]);
+
+        assert!(
+            buffer.capacity() >= first_resize_size,
+            "Buffer should increase capacity after first resize"
+        );
+        assert_eq!(
+            pool.resize_events(),
+            initial_resize_events + 1,
+            "One resize event should be recorded"
+        );
+
+        let mid_bucket_idx = pool.best_fit(buffer.capacity()).unwrap();
+        assert!(
+            mid_bucket_idx > original_bucket_idx,
+            "Buffer should move to a larger bucket after first resize"
+        );
+
+        let mid_bucket_in_use = pool.bucket_current_elements(mid_bucket_idx);
+        assert_eq!(
+            pool.bucket_current_elements(original_bucket_idx),
+            original_bucket_in_use - 1,
+            "Original bucket count should decrease after first resize"
+        );
+
+        let second_resize_size = BUCKET_SIZES[9]; // 1MiB
+        buffer.put_slice(&vec![0u8; second_resize_size]);
+
+        assert!(
+            buffer.capacity() >= second_resize_size,
+            "Buffer should increase capacity after second resize"
+        );
+        assert_eq!(
+            pool.resize_events(),
+            initial_resize_events + 2,
+            "Two resize events should be recorded"
+        );
+
+        let final_bucket_idx = pool.best_fit(buffer.capacity()).unwrap();
+        assert!(
+            final_bucket_idx > mid_bucket_idx,
+            "Buffer should move to an even larger bucket after second resize"
+        );
+
+        let final_bucket_in_use = 
pool.bucket_current_elements(final_bucket_idx);
+        assert_eq!(
+            pool.bucket_current_elements(mid_bucket_idx),
+            mid_bucket_in_use - 1,
+            "Mid bucket count should decrease after second resize"
+        );
+
+        drop(buffer);
+
+        assert_eq!(
+            pool.bucket_current_elements(original_bucket_idx),
+            original_bucket_in_use - 1,
+            "Original bucket should remain decreased"
+        );
+
+        assert_eq!(
+            pool.bucket_current_elements(mid_bucket_idx),
+            mid_bucket_in_use - 1,
+            "Mid bucket should remain decreased"
+        );
+
+        assert_eq!(
+            pool.bucket_current_elements(final_bucket_idx),
+            final_bucket_in_use - 1,
+            "Final bucket count should decrease after drop"
+        );
+    }
+
+    #[test]
+    fn test_different_resize_methods() {
+        initialize_pool_for_tests();
+
+        let pool = memory_pool();
+
+        // Test put_slice
+        {
+            let initial_events = pool.resize_events();
+            let mut buffer = PooledBuffer::with_capacity(4 * 1024);
+            let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap();
+            let orig_in_use = pool.bucket_current_elements(orig_bucket_idx);
+
+            buffer.put_slice(&vec![0u8; 64 * 1024]);
+
+            assert_eq!(
+                pool.resize_events(),
+                initial_events + 1,
+                "put_slice should trigger resize event"
+            );
+            assert_eq!(
+                pool.bucket_current_elements(orig_bucket_idx),
+                orig_in_use - 1,
+                "put_slice should update bucket accounting"
+            );
+        }
+
+        // Test put_bytes
+        {
+            let initial_events = pool.resize_events();
+            let mut buffer = PooledBuffer::with_capacity(4 * 1024);
+            let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap();
+            let orig_in_use = pool.bucket_current_elements(orig_bucket_idx);
+
+            buffer.put_bytes(0, 64 * 1024); // 64KiB of zeros
+
+            assert_eq!(
+                pool.resize_events(),
+                initial_events + 1,
+                "put_bytes should trigger resize event"
+            );
+            assert_eq!(
+                pool.bucket_current_elements(orig_bucket_idx),
+                orig_in_use - 1,
+                "put_bytes should update bucket accounting"
+            );
+        }
+
+        // Test extend_from_slice
+        {
+            let initial_events = pool.resize_events();
+            let mut buffer = PooledBuffer::with_capacity(4 * 1024);
+            let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap();
+            let orig_in_use = pool.bucket_current_elements(orig_bucket_idx);
+
+            buffer.extend_from_slice(&vec![0u8; 64 * 1024]);
+
+            assert_eq!(
+                pool.resize_events(),
+                initial_events + 1,
+                "extend_from_slice should trigger resize event"
+            );
+            assert_eq!(
+                pool.bucket_current_elements(orig_bucket_idx),
+                orig_in_use - 1,
+                "extend_from_slice should update bucket accounting"
+            );
+        }
+
+        // Test reserve
+        {
+            let initial_events = pool.resize_events();
+            let mut buffer = PooledBuffer::with_capacity(4 * 1024);
+            let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap();
+            let orig_in_use = pool.bucket_current_elements(orig_bucket_idx);
+
+            buffer.reserve(64 * 1024);
+
+            if buffer.capacity() > 4 * 1024 {
+                assert_eq!(
+                    pool.resize_events(),
+                    initial_events + 1,
+                    "reserve should trigger resize event when capacity changes"
+                );
+                assert_eq!(
+                    pool.bucket_current_elements(orig_bucket_idx),
+                    orig_in_use - 1,
+                    "reserve should update bucket accounting when capacity 
changes"
+                );
+            }
+        }
+    }
+}
diff --git a/core/common/src/alloc/mod.rs b/core/common/src/alloc/mod.rs
new file mode 100644
index 000000000..4502a53ce
--- /dev/null
+++ b/core/common/src/alloc/mod.rs
@@ -0,0 +1,21 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub(crate) mod buffer;
+pub(crate) mod memory_pool;
+
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 032ea4bb9..6ed2265e7 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod alloc;
 mod certificates;
 mod commands;
 mod configs;
@@ -27,6 +28,7 @@ pub use error::client_error::ClientError;
 pub use error::iggy_error::{IggyError, IggyErrorDiscriminants};
 // Locking is feature gated, thus only mod level re-export.
 pub mod locking;
+pub use alloc::memory_pool::MemoryPoolConfig;
 pub use certificates::generate_self_signed_certificate;
 pub use commands::consumer_groups::*;
 pub use commands::consumer_offsets::*;
@@ -87,6 +89,7 @@ pub use types::topic::*;
 pub use types::user::user_identity_info::*;
 pub use types::user::user_info::*;
 pub use types::user::user_status::*;
+pub use types::sender::{Sender, SenderKind, TcpSender, TcpTlsSender, 
QuicSender, WebSocketSender, WebSocketTlsSender};
 pub use utils::byte_size::IggyByteSize;
 pub use utils::checksum::*;
 pub use utils::crypto::*;
diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs
index f0d70cf9c..0eb0da82b 100644
--- a/core/common/src/types/mod.rs
+++ b/core/common/src/types/mod.rs
@@ -34,3 +34,4 @@ pub(crate) mod stats;
 pub(crate) mod stream;
 pub(crate) mod topic;
 pub(crate) mod user;
+pub(crate) mod sender;
diff --git a/core/common/src/types/sender/mod.rs 
b/core/common/src/types/sender/mod.rs
new file mode 100644
index 000000000..fe8ec7cf2
--- /dev/null
+++ b/core/common/src/types/sender/mod.rs
@@ -0,0 +1,230 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+mod quic_sender;
+mod tcp_sender;
+mod tcp_tls_sender;
+mod websocket_sender;
+mod websocket_tls_sender;
+
+pub use quic_sender::QuicSender;
+pub use tcp_sender::TcpSender;
+pub use tcp_tls_sender::TcpTlsSender;
+pub use websocket_sender::WebSocketSender;
+pub use websocket_tls_sender::WebSocketTlsSender;
+
+use crate::IggyError;
+use crate::alloc::buffer::PooledBuffer;
+use compio::buf::IoBufMut;
+use compio::io::{AsyncReadExt, AsyncWriteExt};
+use compio::net::TcpStream;
+use compio::BufResult;
+use compio_quic::{RecvStream, SendStream};
+use compio_tls::TlsStream;
+use std::future::Future;
+use tracing::debug;
+
+macro_rules! forward_async_methods {
+    (
+        $(
+            async fn $method_name:ident
+            $(<$($generic:ident $(: $bound:path)?),+>)?
+            (
+                &mut self $(, $arg:ident : $arg_ty:ty )*
+            ) -> $ret:ty ;
+        )*
+    ) => {
+        $(
+            pub async fn $method_name
+            $(<$($generic $(: $bound)?),+>)?
+            (&mut self, $( $arg: $arg_ty ),* ) -> $ret {
+                match self {
+                    Self::Tcp(d) => d.$method_name$(::<$($generic),+>)?($( 
$arg ),*).await,
+                    Self::TcpTls(s) => s.$method_name$(::<$($generic),+>)?($( 
$arg ),*).await,
+                    Self::Quic(s) => s.$method_name$(::<$($generic),+>)?($( 
$arg ),*).await,
+                    Self::WebSocket(s) => 
s.$method_name$(::<$($generic),+>)?($( $arg ),*).await,
+                    Self::WebSocketTls(s) => 
s.$method_name$(::<$($generic),+>)?($( $arg ),*).await,
+                }
+            }
+        )*
+    }
+}
+
+pub trait Sender {
+    fn read<B: IoBufMut>(&mut self, buffer: B) -> impl Future<Output = 
(Result<(), IggyError>, B)>;
+    fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), 
IggyError>>;
+    fn send_ok_response(&mut self, payload: &[u8]) -> impl Future<Output = 
Result<(), IggyError>>;
+    fn send_ok_response_vectored(
+        &mut self,
+        length: &[u8],
+        slices: Vec<PooledBuffer>,
+    ) -> impl Future<Output = Result<(), IggyError>>;
+    fn send_error_response(
+        &mut self,
+        error: IggyError,
+    ) -> impl Future<Output = Result<(), IggyError>>;
+    fn shutdown(&mut self) -> impl Future<Output = Result<(), std::io::Error>>;
+}
+
+#[allow(clippy::large_enum_variant)]
+pub enum SenderKind {
+    Tcp(TcpSender),
+    TcpTls(TcpTlsSender),
+    Quic(QuicSender),
+    WebSocket(WebSocketSender),
+    WebSocketTls(WebSocketTlsSender),
+}
+
+impl SenderKind {
+    pub fn get_tcp_sender(stream: TcpStream) -> Self {
+        Self::Tcp(TcpSender { stream })
+    }
+
+    pub fn get_tcp_tls_sender(stream: TlsStream<TcpStream>) -> Self {
+        Self::TcpTls(TcpTlsSender { stream })
+    }
+
+    pub fn get_quic_sender(send_stream: SendStream, recv_stream: RecvStream) 
-> Self {
+        Self::Quic(QuicSender {
+            send: send_stream,
+            recv: recv_stream,
+        })
+    }
+
+    pub fn get_websocket_sender(stream: WebSocketSender) -> Self {
+        Self::WebSocket(stream)
+    }
+
+    pub fn get_websocket_tls_sender(stream: WebSocketTlsSender) -> Self {
+        Self::WebSocketTls(stream)
+    }
+
+    forward_async_methods! {
+        async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(), 
IggyError>, B);
+        async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>;
+        async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), 
IggyError>;
+        async fn send_ok_response_vectored(&mut self, length: &[u8], slices: 
Vec<PooledBuffer>) -> Result<(), IggyError>;
+        async fn send_error_response(&mut self, error: IggyError) -> 
Result<(), IggyError>;
+        async fn shutdown(&mut self) -> Result<(), std::io::Error>;
+    }
+}
+
+const STATUS_OK: &[u8] = &[0; 4];
+
+pub(crate) async fn read<T, B>(stream: &mut T, buffer: B) -> (Result<(), 
IggyError>, B)
+where
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
+    B: IoBufMut,
+{
+    let BufResult(result, buffer) = stream.read_exact(buffer).await;
+    match (result, buffer) {
+        (Ok(_), buffer) => (Ok(()), buffer),
+        (Err(error), buffer) => {
+            if error.kind() == std::io::ErrorKind::UnexpectedEof {
+                (Err(IggyError::ConnectionClosed), buffer)
+            } else {
+                (Err(IggyError::TcpError), buffer)
+            }
+        }
+    }
+}
+
+pub(crate) async fn send_empty_ok_response<T>(stream: &mut T) -> Result<(), 
IggyError>
+where
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
+{
+    send_ok_response(stream, &[]).await
+}
+
+pub(crate) async fn send_ok_response<T>(stream: &mut T, payload: &[u8]) -> 
Result<(), IggyError>
+where
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
+{
+    send_response(stream, STATUS_OK, payload).await
+}
+
+pub(crate) async fn send_ok_response_vectored<T>(
+    stream: &mut T,
+    length: &[u8],
+    slices: Vec<PooledBuffer>,
+) -> Result<(), IggyError>
+where
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
+{
+    send_response_vectored(stream, STATUS_OK, length, slices).await
+}
+
+pub(crate) async fn send_error_response<T>(
+    stream: &mut T,
+    error: IggyError,
+) -> Result<(), IggyError>
+where
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
+{
+    send_response(stream, &error.as_code().to_le_bytes(), &[]).await
+}
+
+pub(crate) async fn send_response<T>(
+    stream: &mut T,
+    status: &[u8],
+    payload: &[u8],
+) -> Result<(), IggyError>
+where
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
+{
+    debug!(
+        "Sending response of len: {} with status: {:?}...",
+        payload.len(),
+        status
+    );
+    let length = (payload.len() as u32).to_le_bytes();
+    stream
+        .write_all([status, &length, payload].concat())
+        .await
+        .0
+        .map_err(|_| IggyError::TcpError)?;
+    debug!("Sent response with status: {:?}", status);
+    Ok(())
+}
+
+pub(crate) async fn send_response_vectored<T>(
+    stream: &mut T,
+    status: &[u8],
+    length: &[u8],
+    mut slices: Vec<PooledBuffer>,
+) -> Result<(), IggyError>
+where
+    T: AsyncReadExt + AsyncWriteExt + Unpin,
+{
+    let resp_status = u32::from_le_bytes(status.try_into().unwrap());
+    debug!(
+        "Sending vectored response of len: {} with status: {:?}...",
+        slices.len(),
+        resp_status
+    );
+    let status = PooledBuffer::from(status);
+    let length = PooledBuffer::from(length);
+    slices.splice(0..0, [status, length]);
+    stream
+        .write_vectored_all(slices)
+        .await
+        .0
+        .map_err(|_| IggyError::TcpError)?;
+    debug!("Sent response with status: {:?}", resp_status);
+    Ok(())
+}
diff --git a/core/common/src/types/sender/quic_sender.rs 
b/core/common/src/types/sender/quic_sender.rs
new file mode 100644
index 000000000..412f608ac
--- /dev/null
+++ b/core/common/src/types/sender/quic_sender.rs
@@ -0,0 +1,142 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{PooledBuffer, Sender};
+use crate::IggyError;
+use compio::buf::IoBufMut;
+use compio::io::AsyncReadExt;
+use compio::BufResult;
+use compio_quic::{RecvStream, SendStream};
+use err_trail::ErrContext;
+use tracing::{debug, error};
+
+const COMPONENT: &str = "QUIC";
+const STATUS_OK: &[u8] = &[0; 4];
+
+#[derive(Debug)]
+pub struct QuicSender {
+    pub(crate) send: SendStream,
+    pub(crate) recv: RecvStream,
+}
+
+impl Sender for QuicSender {
+    /// Reads data from the QUIC stream directly into the buffer.
+    async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(), 
IggyError>, B) {
+        let BufResult(result, buffer) =
+            <RecvStream as AsyncReadExt>::read_exact(&mut self.recv, 
buffer).await;
+        match (result, buffer) {
+            (Ok(_), buffer) => (Ok(()), buffer),
+            (Err(error), buffer) => {
+                error!("Failed to read from the stream: {:?}", error);
+                (Err(IggyError::QuicError), buffer)
+            }
+        }
+    }
+
+    async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
+        self.send_ok_response(&[]).await
+    }
+
+    async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), 
IggyError> {
+        self.send_response(STATUS_OK, payload).await
+    }
+
+    async fn send_error_response(&mut self, error: IggyError) -> Result<(), 
IggyError> {
+        self.send_response(&error.as_code().to_le_bytes(), &[])
+            .await
+    }
+
+    async fn shutdown(&mut self) -> Result<(), std::io::Error> {
+        Ok(())
+    }
+
+    async fn send_ok_response_vectored(
+        &mut self,
+        length: &[u8],
+        slices: Vec<PooledBuffer>,
+    ) -> Result<(), IggyError> {
+        debug!("Sending vectored response with status: {:?}...", STATUS_OK);
+
+        let headers = [STATUS_OK, length].concat();
+        self.send
+            .write_all(&headers)
+            .await
+            .with_error(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to write 
headers to stream")
+            })
+            .map_err(|_| IggyError::QuicError)?;
+
+        let mut total_bytes_written = 0;
+
+        for slice in slices {
+            let slice_data = &*slice;
+            if !slice_data.is_empty() {
+                self.send
+                    .write_all(slice_data)
+                    .await
+                    .with_error(|error| {
+                        format!("{COMPONENT} (error: {error}) - failed to 
write slice to stream")
+                    })
+                    .map_err(|_| IggyError::QuicError)?;
+
+                total_bytes_written += slice_data.len();
+            }
+        }
+
+        debug!(
+            "Sent vectored response: {} bytes of payload",
+            total_bytes_written
+        );
+
+        self.send
+            .finish()
+            .with_error(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to finish send 
stream")
+            })
+            .map_err(|_| IggyError::QuicError)?;
+
+        debug!("Sent vectored response with status: {:?}", STATUS_OK);
+        Ok(())
+    }
+}
+
+impl QuicSender {
+    async fn send_response(&mut self, status: &[u8], payload: &[u8]) -> 
Result<(), IggyError> {
+        debug!(
+            "Sending response of len: {} with status: {:?}...",
+            payload.len(),
+            status
+        );
+        let length = (payload.len() as u32).to_le_bytes();
+        self.send
+            .write_all(&[status, &length, payload].as_slice().concat())
+            .await
+            .with_error(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to write buffer 
to the stream")
+            })
+            .map_err(|_| IggyError::QuicError)?;
+        self.send
+            .finish()
+            .with_error(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to finish send 
stream")
+            })
+            .map_err(|_| IggyError::QuicError)?;
+        debug!("Sent response with status: {:?}", status);
+        Ok(())
+    }
+}
diff --git a/core/common/src/types/sender/tcp_sender.rs 
b/core/common/src/types/sender/tcp_sender.rs
new file mode 100644
index 000000000..b3596039b
--- /dev/null
+++ b/core/common/src/types/sender/tcp_sender.rs
@@ -0,0 +1,66 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{PooledBuffer, Sender};
+use crate::IggyError;
+use compio::buf::IoBufMut;
+use compio::io::AsyncWrite;
+use compio::net::TcpStream;
+use err_trail::ErrContext;
+
+const COMPONENT: &str = "TCP";
+
+#[derive(Debug)]
+pub struct TcpSender {
+    pub(crate) stream: TcpStream,
+}
+
+impl Sender for TcpSender {
+    async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(), 
IggyError>, B) {
+        super::read(&mut self.stream, buffer).await
+    }
+
+    async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
+        super::send_empty_ok_response(&mut self.stream).await
+    }
+
+    async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), 
IggyError> {
+        super::send_ok_response(&mut self.stream, payload).await
+    }
+
+    async fn send_error_response(&mut self, error: IggyError) -> Result<(), 
IggyError> {
+        super::send_error_response(&mut self.stream, error).await
+    }
+
+    async fn shutdown(&mut self) -> Result<(), std::io::Error> {
+        self.stream
+            .shutdown()
+            .await
+            .with_error(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to shutdown TCP 
stream")
+            })
+    }
+
+    async fn send_ok_response_vectored(
+        &mut self,
+        length: &[u8],
+        slices: Vec<PooledBuffer>,
+    ) -> Result<(), IggyError> {
+        super::send_ok_response_vectored(&mut self.stream, length, 
slices).await
+    }
+}
diff --git a/core/common/src/types/sender/tcp_tls_sender.rs 
b/core/common/src/types/sender/tcp_tls_sender.rs
new file mode 100644
index 000000000..d2c070a7a
--- /dev/null
+++ b/core/common/src/types/sender/tcp_tls_sender.rs
@@ -0,0 +1,87 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{PooledBuffer, Sender};
+use crate::IggyError;
+use compio::buf::IoBufMut;
+use compio::io::AsyncWrite;
+use compio::net::TcpStream;
+use compio_tls::TlsStream;
+use err_trail::ErrContext;
+
+const COMPONENT: &str = "TCP";
+
+#[derive(Debug)]
+pub struct TcpTlsSender {
+    pub(crate) stream: TlsStream<TcpStream>,
+}
+
+impl Sender for TcpTlsSender {
+    async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(), 
IggyError>, B) {
+        super::read(&mut self.stream, buffer).await
+    }
+
+    async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
+        super::send_empty_ok_response(&mut self.stream).await?;
+        self.stream
+            .flush()
+            .await
+            .with_error(|e| format!("failed to flush TCP stream after sending 
response: {e}"))
+            .map_err(|_| IggyError::TcpError)
+    }
+
+    async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), 
IggyError> {
+        super::send_ok_response(&mut self.stream, payload).await?;
+        self.stream
+            .flush()
+            .await
+            .with_error(|e| format!("failed to flush TCP stream after sending 
response: {e}"))
+            .map_err(|_| IggyError::TcpError)
+    }
+
+    async fn send_error_response(&mut self, error: IggyError) -> Result<(), 
IggyError> {
+        super::send_error_response(&mut self.stream, error).await?;
+        self.stream
+            .flush()
+            .await
+            .with_error(|e| format!("failed to flush TCP stream after sending 
response: {e}"))
+            .map_err(|_| IggyError::TcpError)
+    }
+
+    async fn shutdown(&mut self) -> Result<(), std::io::Error> {
+        self.stream
+            .shutdown()
+            .await
+            .with_error(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to shutdown TCP 
TLS stream")
+            })
+    }
+
+    async fn send_ok_response_vectored(
+        &mut self,
+        length: &[u8],
+        slices: Vec<PooledBuffer>,
+    ) -> Result<(), IggyError> {
+        super::send_ok_response_vectored(&mut self.stream, length, 
slices).await?;
+        self.stream
+            .flush()
+            .await
+            .with_error(|e| format!("failed to flush TCP stream after sending 
response: {e}"))
+            .map_err(|_| IggyError::TcpError)
+    }
+}
diff --git a/core/common/src/types/sender/websocket_sender.rs 
b/core/common/src/types/sender/websocket_sender.rs
new file mode 100644
index 000000000..7804660a7
--- /dev/null
+++ b/core/common/src/types/sender/websocket_sender.rs
@@ -0,0 +1,208 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{Sender};
+use crate::IggyError;
+use crate::alloc::buffer::PooledBuffer;
+use bytes::{BufMut, BytesMut};
+use compio::buf::IoBufMut;
+use compio::net::TcpStream;
+use compio_ws::TungsteniteError;
+use compio_ws::{WebSocketMessage as Message, WebSocketStream};
+use std::ptr;
+use tracing::{debug, warn};
+
+const READ_BUFFER_CAPACITY: usize = 8192;
+const WRITE_BUFFER_CAPACITY: usize = 8192;
+const STATUS_OK: &[u8] = &[0; 4];
+
+pub struct WebSocketSender {
+    pub(crate) stream: WebSocketStream<TcpStream>,
+    pub(crate) read_buffer: BytesMut,
+    pub(crate) write_buffer: BytesMut,
+}
+
+impl WebSocketSender {
+    pub fn new(stream: WebSocketStream<TcpStream>) -> Self {
+        Self {
+            stream,
+            read_buffer: BytesMut::with_capacity(READ_BUFFER_CAPACITY),
+            write_buffer: BytesMut::with_capacity(WRITE_BUFFER_CAPACITY),
+        }
+    }
+
+    async fn flush_write_buffer(&mut self) -> Result<(), IggyError> {
+        if self.write_buffer.is_empty() {
+            return Ok(());
+        }
+        let data = self.write_buffer.split().freeze();
+        debug!("WebSocket sending data: {:?}", data.to_vec());
+
+        self.stream.send(Message::Binary(data)).await.map_err(|e| {
+            debug!("WebSocket send error: {:?}", e);
+            match e {
+                TungsteniteError::ConnectionClosed | 
TungsteniteError::AlreadyClosed => {
+                    IggyError::ConnectionClosed
+                }
+                TungsteniteError::Io(ref io_err)
+                    if io_err.kind() == std::io::ErrorKind::BrokenPipe =>
+                {
+                    warn!("Broken pipe detected (client closed connection)");
+                    IggyError::ConnectionClosed
+                }
+                _ => IggyError::TcpError,
+            }
+        })
+    }
+}
+
+impl std::fmt::Debug for WebSocketSender {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("WebSocketSender").finish()
+    }
+}
+
+impl Sender for WebSocketSender {
+    async fn read<B: IoBufMut>(&mut self, mut buffer: B) -> (Result<(), 
IggyError>, B) {
+        let required_len = buffer.buf_capacity();
+        if required_len == 0 {
+            return (Ok(()), buffer);
+        }
+
+        while self.read_buffer.len() < required_len {
+            match self.stream.read().await {
+                Ok(Message::Binary(data)) => {
+                    self.read_buffer.extend_from_slice(&data);
+                }
+                Ok(Message::Close(_)) => {
+                    return (Err(IggyError::ConnectionClosed), buffer);
+                }
+                Ok(Message::Ping(data)) => {
+                    if self.stream.send(Message::Pong(data)).await.is_err() {
+                        return (Err(IggyError::ConnectionClosed), buffer);
+                    }
+                }
+                Ok(_) => { /* Ignore other message types */ }
+                Err(_) => {
+                    return (Err(IggyError::ConnectionClosed), buffer);
+                }
+            }
+        }
+
+        let data_to_copy = self.read_buffer.split_to(required_len);
+
+        unsafe {
+            ptr::copy_nonoverlapping(data_to_copy.as_ptr(), 
buffer.as_buf_mut_ptr(), required_len);
+            buffer.set_buf_init(required_len);
+        }
+
+        (Ok(()), buffer)
+    }
+
+    async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
+        self.send_ok_response(&[]).await
+    }
+
+    async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), 
IggyError> {
+        debug!(
+            "Sending WebSocket response with status: OK, payload length: {}",
+            payload.len()
+        );
+
+        let length = (payload.len() as u32).to_le_bytes();
+        let total_size = STATUS_OK.len() + length.len() + payload.len();
+
+        if self.write_buffer.len() + total_size > self.write_buffer.capacity() 
{
+            self.flush_write_buffer().await?;
+        }
+
+        self.write_buffer.put_slice(STATUS_OK);
+        self.write_buffer.put_slice(&length);
+        self.write_buffer.put_slice(payload);
+
+        self.flush_write_buffer().await
+    }
+
+    async fn send_error_response(&mut self, error: IggyError) -> Result<(), 
IggyError> {
+        let status = &error.as_code().to_le_bytes();
+        debug!("Sending WebSocket error response with status: {:?}", status);
+        let length = 0u32.to_le_bytes();
+        let total_size = status.len() + length.len();
+
+        if self.write_buffer.len() + total_size > self.write_buffer.capacity() 
{
+            self.flush_write_buffer().await?;
+        }
+        self.write_buffer.put_slice(status);
+        self.write_buffer.put_slice(&length);
+        self.flush_write_buffer().await
+    }
+
+    async fn shutdown(&mut self) -> Result<(), std::io::Error> {
+        self.flush_write_buffer()
+            .await
+            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, 
e.to_string()))?;
+
+        match self.stream.close(None).await {
+            Ok(_) => Ok(()),
+            Err(e) => match e {
+                TungsteniteError::ConnectionClosed | 
TungsteniteError::AlreadyClosed => {
+                    debug!("WebSocket connection already closed: {}", e);
+                    Ok(())
+                }
+                _ => Err(std::io::Error::new(
+                    std::io::ErrorKind::Other,
+                    format!("Failed to close WebSocket connection: {}", e),
+                )),
+            },
+        }
+    }
+
+    async fn send_ok_response_vectored(
+        &mut self,
+        length: &[u8],
+        slices: Vec<PooledBuffer>,
+    ) -> Result<(), IggyError> {
+        self.flush_write_buffer().await?;
+
+        let total_payload_size = slices.iter().map(|s| s.len()).sum::<usize>();
+        let total_size = STATUS_OK.len() + length.len() + total_payload_size;
+
+        let mut response_bytes = BytesMut::with_capacity(total_size);
+        response_bytes.put_slice(STATUS_OK);
+        response_bytes.put_slice(length);
+        for slice in slices {
+            response_bytes.put_slice(&slice);
+        }
+
+        self.stream
+            .send(Message::Binary(response_bytes.freeze()))
+            .await
+            .map_err(|e| match e {
+                TungsteniteError::ConnectionClosed | 
TungsteniteError::AlreadyClosed => {
+                    IggyError::ConnectionClosed
+                }
+                TungsteniteError::Io(ref io_err)
+                    if io_err.kind() == std::io::ErrorKind::BrokenPipe =>
+                {
+                    warn!("Broken pipe in vectored send - client closed 
connection");
+                    IggyError::ConnectionClosed
+                }
+                _ => IggyError::TcpError,
+            })
+    }
+}
diff --git a/core/common/src/types/sender/websocket_tls_sender.rs 
b/core/common/src/types/sender/websocket_tls_sender.rs
new file mode 100644
index 000000000..0ffc77aba
--- /dev/null
+++ b/core/common/src/types/sender/websocket_tls_sender.rs
@@ -0,0 +1,188 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::Sender;
+use crate::IggyError;
+use crate::alloc::buffer::PooledBuffer;
+use bytes::{BufMut, BytesMut};
+use compio::buf::IoBufMut;
+use compio::net::TcpStream;
+use compio_tls::TlsStream;
+use compio_ws::TungsteniteError;
+use compio_ws::{WebSocketMessage as Message, WebSocketStream};
+use std::ptr;
+use tracing::debug;
+
+const READ_BUFFER_CAPACITY: usize = 8192;
+const WRITE_BUFFER_CAPACITY: usize = 8192;
+const STATUS_OK: &[u8] = &[0; 4];
+
+pub struct WebSocketTlsSender {
+    pub(crate) stream: WebSocketStream<TlsStream<TcpStream>>,
+    pub(crate) read_buffer: BytesMut,
+    pub(crate) write_buffer: BytesMut,
+}
+
+impl WebSocketTlsSender {
+    pub fn new(stream: WebSocketStream<TlsStream<TcpStream>>) -> Self {
+        Self {
+            stream,
+            read_buffer: BytesMut::with_capacity(READ_BUFFER_CAPACITY),
+            write_buffer: BytesMut::with_capacity(WRITE_BUFFER_CAPACITY),
+        }
+    }
+
+    async fn flush_write_buffer(&mut self) -> Result<(), IggyError> {
+        if self.write_buffer.is_empty() {
+            return Ok(());
+        }
+        let data = self.write_buffer.split().freeze();
+        self.stream
+            .send(Message::Binary(data))
+            .await
+            .map_err(|_| IggyError::TcpError)
+    }
+}
+
+impl std::fmt::Debug for WebSocketTlsSender {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("WebSocketTlsSender").finish()
+    }
+}
+
+impl Sender for WebSocketTlsSender {
+    async fn read<B: IoBufMut>(&mut self, mut buffer: B) -> (Result<(), 
IggyError>, B) {
+        let required_len = buffer.buf_capacity();
+        if required_len == 0 {
+            return (Ok(()), buffer);
+        }
+
+        while self.read_buffer.len() < required_len {
+            match self.stream.read().await {
+                Ok(Message::Binary(data)) => {
+                    self.read_buffer.extend_from_slice(&data);
+                }
+                Ok(Message::Close(_)) => {
+                    return (Err(IggyError::ConnectionClosed), buffer);
+                }
+                Ok(Message::Ping(data)) => {
+                    if self.stream.send(Message::Pong(data)).await.is_err() {
+                        return (Err(IggyError::ConnectionClosed), buffer);
+                    }
+                }
+                Ok(_) => { /* Ignore other message types */ }
+                Err(_) => {
+                    return (Err(IggyError::ConnectionClosed), buffer);
+                }
+            }
+        }
+
+        let data_to_copy = self.read_buffer.split_to(required_len);
+
+        unsafe {
+            ptr::copy_nonoverlapping(data_to_copy.as_ptr(), 
buffer.as_buf_mut_ptr(), required_len);
+            buffer.set_buf_init(required_len);
+        }
+
+        (Ok(()), buffer)
+    }
+
+    async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
+        self.send_ok_response(&[]).await
+    }
+
+    async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), 
IggyError> {
+        debug!(
+            "Sending WebSocket TLS response with status: OK, payload length: 
{}",
+            payload.len()
+        );
+
+        let length = (payload.len() as u32).to_le_bytes();
+        let total_size = STATUS_OK.len() + length.len() + payload.len();
+
+        if self.write_buffer.len() + total_size > self.write_buffer.capacity() 
{
+            self.flush_write_buffer().await?;
+        }
+
+        self.write_buffer.put_slice(STATUS_OK);
+        self.write_buffer.put_slice(&length);
+        self.write_buffer.put_slice(payload);
+
+        self.flush_write_buffer().await
+    }
+
+    async fn send_error_response(&mut self, error: IggyError) -> Result<(), 
IggyError> {
+        let status = &error.as_code().to_le_bytes();
+        debug!(
+            "Sending WebSocket TLS error response with status: {:?}",
+            status
+        );
+        let length = 0u32.to_le_bytes();
+        let total_size = status.len() + length.len();
+
+        if self.write_buffer.len() + total_size > self.write_buffer.capacity() 
{
+            self.flush_write_buffer().await?;
+        }
+        self.write_buffer.put_slice(status);
+        self.write_buffer.put_slice(&length);
+        self.flush_write_buffer().await
+    }
+
+    async fn shutdown(&mut self) -> Result<(), std::io::Error> {
+        self.flush_write_buffer()
+            .await
+            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, 
e.to_string()))?;
+
+        match self.stream.close(None).await {
+            Ok(_) => Ok(()),
+            Err(e) => match e {
+                TungsteniteError::ConnectionClosed | 
TungsteniteError::AlreadyClosed => {
+                    debug!("WebSocket TLS connection already closed: {}", e);
+                    Ok(())
+                }
+                _ => Err(std::io::Error::new(
+                    std::io::ErrorKind::Other,
+                    format!("Failed to close WebSocket TLS connection: {}", e),
+                )),
+            },
+        }
+    }
+
+    async fn send_ok_response_vectored(
+        &mut self,
+        length: &[u8],
+        slices: Vec<PooledBuffer>,
+    ) -> Result<(), IggyError> {
+        self.flush_write_buffer().await?;
+
+        let total_payload_size = slices.iter().map(|s| s.len()).sum::<usize>();
+        let total_size = STATUS_OK.len() + length.len() + total_payload_size;
+
+        let mut response_bytes = BytesMut::with_capacity(total_size);
+        response_bytes.put_slice(STATUS_OK);
+        response_bytes.put_slice(length);
+        for slice in slices {
+            response_bytes.put_slice(&slice);
+        }
+
+        self.stream
+            .send(Message::Binary(response_bytes.freeze()))
+            .await
+            .map_err(|_| IggyError::TcpError)
+    }
+}
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index b6c4b5de2..f40fc72d4 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -17,6 +17,9 @@
 
 pub trait MessageBus {}
 
-pub struct IggyMessageBus;
+pub struct IggyMessageBus {
+    client_connections: HashMap<u128, ClientConnection>
+
+}
 
 impl MessageBus for IggyMessageBus {}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 6fa817948..7a2503b22 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -63,8 +63,8 @@ dashmap = { workspace = true }
 derive_more = { workspace = true }
 dotenvy = { workspace = true }
 enum_dispatch = { workspace = true }
-err_trail = { version = "0.10.2" }
-error_set = { version = "0.9.0" }
+err_trail = { workspace = true }
+error_set = { workspace = true }
 figlet-rs = { workspace = true }
 figment = { workspace = true }
 flume = { workspace = true }
diff --git a/core/server/src/configs/defaults.rs 
b/core/server/src/configs/defaults.rs
index 9849dc8fc..2b205252c 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -17,7 +17,6 @@
  */
 
 use super::sharding::ShardingConfig;
-use super::system::MemoryPoolConfig;
 use super::tcp::TcpSocketConfig;
 use crate::configs::cluster::CurrentNodeConfig;
 use crate::configs::cluster::{ClusterConfig, NodeConfig, OtherNodeConfig, 
TransportPorts};
@@ -37,7 +36,7 @@ use crate::configs::system::{
 };
 use crate::configs::tcp::{TcpConfig, TcpTlsConfig};
 use crate::configs::websocket::{WebSocketConfig, WebSocketTlsConfig};
-use iggy_common::IggyByteSize;
+use iggy_common::{IggyByteSize, MemoryPoolConfig};
 use iggy_common::IggyDuration;
 use std::sync::Arc;
 use std::time::Duration;
diff --git a/core/server/src/configs/system.rs 
b/core/server/src/configs/system.rs
index fae172ca8..d0060818d 100644
--- a/core/server/src/configs/system.rs
+++ b/core/server/src/configs/system.rs
@@ -135,13 +135,6 @@ pub struct RecoveryConfig {
     pub recreate_missing_state: bool,
 }
 
-#[derive(Debug, Deserialize, Serialize)]
-pub struct MemoryPoolConfig {
-    pub enabled: bool,
-    pub size: IggyByteSize,
-    pub bucket_capacity: u32,
-}
-
 #[serde_as]
 #[derive(Debug, Deserialize, Serialize)]
 pub struct SegmentConfig {

Reply via email to