This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch message_bus_impl in repository https://gitbox.apache.org/repos/asf/iggy.git
commit afdfbb6d837f9d742055333f5c59c784782a0ad0 Author: numminex <[email protected]> AuthorDate: Mon Dec 1 16:03:30 2025 +0100 Seomthing --- Cargo.lock | 9 + Cargo.toml | 2 + core/common/Cargo.toml | 9 + core/common/src/alloc/buffer.rs | 285 ++++++++ core/common/src/alloc/memory_pool.rs | 737 +++++++++++++++++++++ core/common/src/alloc/mod.rs | 21 + core/common/src/lib.rs | 3 + core/common/src/types/mod.rs | 1 + core/common/src/types/sender/mod.rs | 230 +++++++ core/common/src/types/sender/quic_sender.rs | 142 ++++ core/common/src/types/sender/tcp_sender.rs | 66 ++ core/common/src/types/sender/tcp_tls_sender.rs | 87 +++ core/common/src/types/sender/websocket_sender.rs | 208 ++++++ .../src/types/sender/websocket_tls_sender.rs | 188 ++++++ core/message_bus/src/lib.rs | 5 +- core/server/Cargo.toml | 4 +- core/server/src/configs/defaults.rs | 3 +- core/server/src/configs/system.rs | 7 - 18 files changed, 1995 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2eb2ae3e..fc4d14e87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4764,10 +4764,19 @@ dependencies = [ "chrono", "clap", "comfy-table", + "compio", + "compio-quic", + "compio-tls", + "compio-ws", + "crossbeam", "derive_more 2.0.1", + "err_trail", + "error_set", "fast-async-mutex", "figment", + "human-repr", "humantime", + "once_cell", "rcgen", "rustls", "serde", diff --git a/Cargo.toml b/Cargo.toml index 8254764e3..8f9752608 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,6 +122,8 @@ dlopen2 = "0.8.0" dotenvy = "0.15.7" enum_dispatch = "0.3.13" env_logger = "0.11.8" +err_trail = "0.10.2" +error_set = "0.9.0" figlet-rs = "0.1.5" figment = { version = "0.10.19", features = ["toml", "env"] } flume = "0.11.1" diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index f4bf769ec..71b3f5157 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -43,8 +43,11 @@ bytes = { workspace = true } chrono = { workspace = true } clap = { workspace = true } comfy-table = { workspace = true } +crossbeam = { workspace = true } derive_more = { workspace = true } fast-async-mutex = { version = "0.6.7", optional = true } +human-repr = { workspace = true } +once_cell = { workspace = true } figment = { workspace = true } humantime = { workspace = true } rcgen = "0.14.5" @@ -59,3 +62,9 @@ toml = { workspace = true } tracing = { workspace = true } tungstenite = { workspace = true } twox-hash = { workspace = true } +compio = { workspace = true } +compio-quic = { workspace = true } +compio-tls = { workspace = true } +compio-ws = { workspace = true } +err_trail = { workspace = true } +error_set = { workspace = true } diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs new file mode 100644 index 000000000..f8ff1186b --- /dev/null +++ b/core/common/src/alloc/buffer.rs @@ -0,0 +1,285 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::memory_pool::{BytesMutExt, memory_pool}; +use bytes::{Buf, BufMut, BytesMut}; +use compio::buf::{IoBuf, IoBufMut, SetBufInit}; +use std::ops::{Deref, DerefMut}; + +/// A buffer wrapper that participates in memory pooling. +/// +/// This buffer automatically acquires memory from the global memory pool +/// and returns it when dropped. It also tracks resize events to keep +/// pool accounting accurate. +#[derive(Debug)] +pub struct PooledBuffer { + from_pool: bool, + original_capacity: usize, + original_bucket_idx: Option<usize>, + inner: BytesMut, +} + +impl Default for PooledBuffer { + fn default() -> Self { + Self::empty() + } +} + +impl PooledBuffer { + /// Creates a new pooled buffer with the specified capacity. + /// + /// # Arguments + /// + /// * `capacity` - The capacity of the buffer + pub fn with_capacity(capacity: usize) -> Self { + let (buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity); + let original_capacity = buffer.capacity(); + let original_bucket_idx = if was_pool_allocated { + memory_pool().best_fit(original_capacity) + } else { + None + }; + Self { + from_pool: was_pool_allocated, + original_capacity, + original_bucket_idx, + inner: buffer, + } + } + + /// Creates a new pooled buffer from an existing `BytesMut`. + /// + /// # Arguments + /// + /// * `existing` - The existing `BytesMut` buffer + pub fn from_existing(existing: BytesMut) -> Self { + Self { + from_pool: false, + original_capacity: existing.capacity(), + original_bucket_idx: None, + inner: existing, + } + } + + /// Creates an empty pooled buffer. + pub fn empty() -> Self { + Self { + from_pool: false, + original_capacity: 0, + original_bucket_idx: None, + inner: BytesMut::new(), + } + } + + /// Checks if the buffer needs to be resized and updates the memory pool accordingly. + /// This shall be called after operations that might cause a resize. + pub fn check_for_resize(&mut self) { + if !self.from_pool { + return; + } + + let current_capacity = self.inner.capacity(); + if current_capacity != self.original_capacity { + memory_pool().inc_resize_events(); + + if let Some(orig_idx) = self.original_bucket_idx { + memory_pool().dec_bucket_in_use(orig_idx); + + if let Some(new_idx) = memory_pool().best_fit(current_capacity) { + // Track as a new allocation in the new bucket + memory_pool().inc_bucket_alloc(new_idx); + memory_pool().inc_bucket_in_use(new_idx); + self.original_bucket_idx = Some(new_idx); + } else { + // Track as an external allocation if no bucket fits + memory_pool().inc_external_allocations(); + self.original_bucket_idx = None; + } + } + + self.original_capacity = current_capacity; + } + } + + /// Wrapper for reserve which might cause resize + pub fn reserve(&mut self, additional: usize) { + let before_cap = self.inner.capacity(); + self.inner.reserve(additional); + + if self.inner.capacity() != before_cap { + self.check_for_resize(); + } + } + + /// Wrapper for extend_from_slice which might cause resize + pub fn extend_from_slice(&mut self, extend_from: &[u8]) { + let before_cap = self.inner.capacity(); + self.inner.extend_from_slice(extend_from); + + if self.inner.capacity() != before_cap { + self.check_for_resize(); + } + } + + /// Wrapper for put_bytes which might cause resize + pub fn put_bytes(&mut self, byte: u8, len: usize) { + let before_cap = self.inner.capacity(); + self.inner.put_bytes(byte, len); + + if self.inner.capacity() != before_cap { + self.check_for_resize(); + } + } + + /// Wrapper for put_slice which might cause resize + pub fn put_slice(&mut self, src: &[u8]) { + let before_cap = self.inner.capacity(); + self.inner.put_slice(src); + + if self.inner.capacity() != before_cap { + self.check_for_resize(); + } + } + + /// Wrapper for put_u32_le which might cause resize + pub fn put_u32_le(&mut self, value: u32) { + let before_cap = self.inner.capacity(); + self.inner.put_u32_le(value); + + if self.inner.capacity() != before_cap { + self.check_for_resize(); + } + } + + /// Wrapper for put_u64_le which might cause resize + pub fn put_u64_le(&mut self, value: u64) { + let before_cap = self.inner.capacity(); + self.inner.put_u64_le(value); + + if self.inner.capacity() != before_cap { + self.check_for_resize(); + } + } + + /// Returns the capacity of the inner buffer + pub fn capacity(&self) -> usize { + self.inner.capacity() + } + + /// Returns the length of the inner buffer + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns true if the buffer is empty + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Consumes the PooledBuffer and returns the inner BytesMut. + /// Note: This bypasses pool return logic, use with caution. + pub fn into_inner(self) -> BytesMut { + let mut this = std::mem::ManuallyDrop::new(self); + std::mem::take(&mut this.inner) + } +} + +impl Deref for PooledBuffer { + type Target = BytesMut; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for PooledBuffer { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Drop for PooledBuffer { + fn drop(&mut self) { + if self.from_pool { + let buf = std::mem::take(&mut self.inner); + buf.return_to_pool(self.original_capacity, true); + } + } +} + +impl From<&[u8]> for PooledBuffer { + fn from(slice: &[u8]) -> Self { + let mut buf = PooledBuffer::with_capacity(slice.len()); + buf.inner.extend_from_slice(slice); + buf + } +} + +impl From<BytesMut> for PooledBuffer { + fn from(bytes: BytesMut) -> Self { + Self::from_existing(bytes) + } +} + +impl Buf for PooledBuffer { + fn remaining(&self) -> usize { + self.inner.remaining() + } + + fn chunk(&self) -> &[u8] { + self.inner.chunk() + } + + fn advance(&mut self, cnt: usize) { + self.inner.advance(cnt) + } + + fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize { + self.inner.chunks_vectored(dst) + } +} + +impl SetBufInit for PooledBuffer { + unsafe fn set_buf_init(&mut self, len: usize) { + if self.inner.len() <= len { + unsafe { + self.inner.set_len(len); + } + } + } +} + +unsafe impl IoBufMut for PooledBuffer { + fn as_buf_mut_ptr(&mut self) -> *mut u8 { + self.inner.as_mut_ptr() + } +} + +unsafe impl IoBuf for PooledBuffer { + fn as_buf_ptr(&self) -> *const u8 { + self.inner.as_buf_ptr() + } + + fn buf_len(&self) -> usize { + self.inner.len() + } + + fn buf_capacity(&self) -> usize { + self.inner.capacity() + } +} \ No newline at end of file diff --git a/core/common/src/alloc/memory_pool.rs b/core/common/src/alloc/memory_pool.rs new file mode 100644 index 000000000..b696a8b6b --- /dev/null +++ b/core/common/src/alloc/memory_pool.rs @@ -0,0 +1,737 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use bytes::BytesMut; +use crossbeam::queue::ArrayQueue; +use human_repr::HumanCount; +use once_cell::sync::OnceCell; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use tracing::{info, trace, warn}; + +/// Global memory pool instance. Use `memory_pool()` to access it. +pub static MEMORY_POOL: OnceCell<MemoryPool> = OnceCell::new(); + +/// Total number of distinct bucket sizes. +const NUM_BUCKETS: usize = 32; + +/// Array of bucket sizes in ascending order. Each entry is a distinct buffer size (in bytes). +const BUCKET_SIZES: [usize; NUM_BUCKETS] = [ + 256, + 512, + 1024, + 2 * 1024, + 4 * 1024, + 8 * 1024, + 16 * 1024, + 32 * 1024, + 64 * 1024, + 128 * 1024, + 256 * 1024, + 512 * 1024, + 768 * 1024, + 1024 * 1024, + 1536 * 1024, + 2 * 1024 * 1024, // Above 2MiB everything should be rounded up to the next power of 2 to take advantage of hugepages + 4 * 1024 * 1024, // (environment variables MIMALLOC_ALLOW_LARGE_OS_PAGES=1 and MIMALLOC_LARGE_OS_PAGES=1). + 6 * 1024 * 1024, + 8 * 1024 * 1024, + 10 * 1024 * 1024, + 12 * 1024 * 1024, + 16 * 1024 * 1024, + 24 * 1024 * 1024, + 32 * 1024 * 1024, + 48 * 1024 * 1024, + 64 * 1024 * 1024, + 96 * 1024 * 1024, + 128 * 1024 * 1024, + 192 * 1024 * 1024, + 256 * 1024 * 1024, + 384 * 1024 * 1024, + 512 * 1024 * 1024, +]; + +/// Retrieve the global `MemoryPool` instance. Panics if not yet initialized. +pub fn memory_pool() -> &'static MemoryPool { + MEMORY_POOL + .get() + .expect("Memory pool not initialized - MemoryPool::init_pool should be called first") +} + +/// Configuration for the memory pool. +#[derive(Deserialize, Serialize, Debug)] +pub struct MemoryPoolConfig { + /// Whether the pool is enabled. + pub enabled: bool, + /// Maximum size of the pool. + pub size: crate::IggyByteSize, + /// Maximum number of buffers per bucket. + pub bucket_capacity: u32, +} + +/// A memory pool that maintains fixed-size buckets for reusing `BytesMut` buffers. +/// +/// Each bucket corresponds to a particular size in `BUCKET_SIZES`. The pool tracks: +/// - Buffers currently in use (`in_use`) +/// - Buffers allocated historically (`allocations`) +/// - Buffers returned to the pool (`returned`) +/// - External allocations/deallocations (buffers allocated outside the pool limit) +/// - Resizes and dropped returns +#[derive(Clone)] +pub struct MemoryPool { + /// Whether the pool is enabled. + pub is_enabled: bool, + + /// Configured maximum bytes for which the pool is responsible. + pub memory_limit: usize, + + /// Configured maximum number of buffers in each bucket. + pub bucket_capacity: usize, + + /// Array of queues for reusable buffers. Each queue can store up to `bucket_capacity` buffers. + /// The length of each queue (`buckets[i].len()`) is how many **free** buffers are currently available. + /// Free doesn't mean the buffer is allocated, it just means it's not in use. + buckets: [Arc<ArrayQueue<BytesMut>>; NUM_BUCKETS], + + /// Number of buffers **in use** for each bucket size (grow/shrink as they are acquired/released). + in_use: [Arc<AtomicUsize>; NUM_BUCKETS], + + /// Total number of buffers **ever allocated** in each bucket. Monotonically increasing. + allocations: [Arc<AtomicUsize>; NUM_BUCKETS], + + /// Total number of buffers **ever returned** to each bucket. Monotonically increasing. + returned: [Arc<AtomicUsize>; NUM_BUCKETS], + + /// Count of buffers allocated outside the pool limit (e.g., if usage exceeds `memory_limit` or no matching bucket). + external_allocations: Arc<AtomicUsize>, + + /// Count of buffers deallocated outside the pool (e.g., returning a buffer that doesn't match any bucket). + external_deallocations: Arc<AtomicUsize>, + + /// Number of resize events detected (the buffer capacity changed between acquire and release). + resize_events: Arc<AtomicUsize>, + + /// Number of returns that couldn't be stored because the relevant bucket queue was at capacity. + dropped_returns: Arc<AtomicUsize>, + + /// Flag set when the pool usage first exceeds `memory_limit`. Used to log a single warning. + capacity_warning: Arc<AtomicBool>, +} + +impl MemoryPool { + /// Create a new memory pool. Usually called from `init_pool` below. + pub fn new(is_enabled: bool, memory_limit: usize, bucket_capacity: usize) -> Self { + let buckets = [0; NUM_BUCKETS].map(|_| Arc::new(ArrayQueue::new(bucket_capacity))); + + if is_enabled { + info!( + "Initializing MemoryPool with {NUM_BUCKETS} buckets, each will have capacity: {bucket_capacity}." + ); + } else { + info!("MemoryPool is disabled."); + } + + Self { + is_enabled, + memory_limit, + bucket_capacity, + buckets, + in_use: [0; NUM_BUCKETS].map(|_| Arc::new(AtomicUsize::new(0))), + allocations: [0; NUM_BUCKETS].map(|_| Arc::new(AtomicUsize::new(0))), + returned: [0; NUM_BUCKETS].map(|_| Arc::new(AtomicUsize::new(0))), + external_allocations: Arc::new(AtomicUsize::new(0)), + external_deallocations: Arc::new(AtomicUsize::new(0)), + resize_events: Arc::new(AtomicUsize::new(0)), + dropped_returns: Arc::new(AtomicUsize::new(0)), + capacity_warning: Arc::new(AtomicBool::new(false)), + } + } + + /// Initialize the global pool from the given config. + pub fn init_pool(config: &MemoryPoolConfig) { + let is_enabled = config.enabled; + let memory_limit = config.size.as_bytes_usize(); + let bucket_capacity = config.bucket_capacity as usize; + + let _ = + MEMORY_POOL.get_or_init(|| MemoryPool::new(is_enabled, memory_limit, bucket_capacity)); + } + + /// Acquire a `BytesMut` buffer with at least `capacity` bytes. + /// + /// - If a bucket can fit `capacity`, try to pop from its free buffer queue; otherwise create a new buffer. + /// - If `memory_limit` would be exceeded, allocate outside the pool. + /// + /// Returns a tuple of (buffer, was_pool_allocated) where was_pool_allocated indicates if the buffer + /// was allocated from the pool (true) or externally (false). + pub fn acquire_buffer(&self, capacity: usize) -> (BytesMut, bool) { + if !self.is_enabled { + return (BytesMut::with_capacity(capacity), false); + } + + let current = self.pool_current_size(); + + match self.best_fit(capacity) { + Some(idx) => { + if let Some(mut buf) = self.buckets[idx].pop() { + buf.clear(); + self.inc_bucket_in_use(idx); + return (buf, true); + } + + let new_size = BUCKET_SIZES[idx]; + if current + new_size > self.memory_limit { + self.set_capacity_warning(true); + trace!( + "Pool is at capacity. Allocating outside the pool: requested {} B, current usage {} B, limit {} B", + new_size, current, self.memory_limit + ); + self.inc_external_allocations(); + return (BytesMut::with_capacity(new_size), false); + } + + self.inc_bucket_alloc(idx); + self.inc_bucket_in_use(idx); + (BytesMut::with_capacity(new_size), true) + } + None => { + if current + capacity > self.memory_limit { + trace!( + "Pool is at capacity. Allocating outside the pool: requested {} B, current usage {} B, limit {} B", + capacity, current, self.memory_limit + ); + self.inc_external_allocations(); + return (BytesMut::with_capacity(capacity), false); + } + + self.inc_external_allocations(); + (BytesMut::with_capacity(capacity), false) + } + } + } + + /// Return a `BytesMut` buffer previously acquired from the pool. + /// + /// - If `current_capacity` differs from `original_capacity`, increments `resize_events`. + /// - If a matching bucket exists, place it back in that bucket's queue (if space is available). + /// - Otherwise, treat it as an external deallocation. + /// - The `was_pool_allocated` flag indicates if this buffer was originally allocated from the pool. + pub fn release_buffer( + &self, + buffer: BytesMut, + original_capacity: usize, + was_pool_allocated: bool, + ) { + if !self.is_enabled { + return; + } + + let current_capacity = buffer.capacity(); + if current_capacity != original_capacity { + self.inc_resize_events(); + trace!( + "Buffer capacity {} != original {} when returning", + current_capacity, original_capacity + ); + } + + if was_pool_allocated && let Some(orig_idx) = self.best_fit(original_capacity) { + self.dec_bucket_in_use(orig_idx); + } + + match self.best_fit(current_capacity) { + Some(idx) => { + self.inc_bucket_return(idx); + + if self.buckets[idx].push(buffer).is_err() { + self.inc_dropped_returns(); + trace!( + "Pool full for size: {} B, dropping buffer", + BUCKET_SIZES[idx] + ); + self.set_capacity_warning(true); + } + } + None => { + self.inc_external_deallocations(); + trace!("Returned outside-of-pool buffer, capacity: {current_capacity}, dropping"); + } + } + } + + /// Write a log message summarizing current usage, allocations, etc. + /// Only logs if the pool is enabled and `pool_current_size()` is non-zero. + /// Also logs a warning if the pool usage has exceeded `memory_limit`. + pub fn log_stats(&self) { + if !self.is_enabled || self.pool_current_size() == 0 { + return; + } + + let bucket_stats = (0..NUM_BUCKETS) + .filter_map(|i| { + let current_el = self.bucket_current_elements(i); + let allocated_el = self.bucket_allocated_elements(i); + + if current_el > 0 || allocated_el > 0 { + Some(format!( + "{label}:[{current_el}/{current_size}/{allocated_el}/{allocated_size}/{returns}]", + label = size_str(BUCKET_SIZES[i]), + current_el = current_el.human_count_bare(), + current_size = size_str(self.bucket_current_size(i)), + allocated_el = allocated_el.human_count_bare(), + allocated_size = size_str(self.bucket_allocated_size(i)), + returns = self.bucket_returns(i).human_count_bare(), + )) + } else { + None + } + }) + .collect::<Vec<String>>() + .join(", "); + + info!( + "Pool Buckets: {bucket_stats} (BucketLabel:[InUseCount/InUseSize/AllocCount/AllocSize/Returns])" + ); + info!( + "Pool Summary: Curr:{current}/Alloc:{allocated}/Util:{util:.1}%/Limit:{limit}/ExtAlloc:{ext_alloc}/ExtDealloc:{ext_dealloc}/DropRet:{drop_ret}/Resizes:{resize_events}/BucketCap:{cap}", + current = size_str(self.pool_current_size()), + allocated = size_str(self.pool_allocated_size()), + util = self.pool_utilization(), + limit = size_str(self.pool_maximum_size()), + ext_alloc = self.external_allocations(), + ext_dealloc = self.external_deallocations(), + drop_ret = self.dropped_returns(), + resize_events = self.resize_events(), + cap = self.bucket_capacity, + ); + + if self.should_print_warning() { + warn!("Memory pool usage exceeded limit! Consider adjusting memory_pool.size."); + self.set_capacity_warning(false); + } + } + + #[inline] + pub fn best_fit(&self, capacity: usize) -> Option<usize> { + match BUCKET_SIZES.binary_search(&capacity) { + Ok(idx) => Some(idx), + Err(idx) => { + if idx < NUM_BUCKETS { + Some(idx) + } else { + None + } + } + } + } + + /// Returns the configured maximum size of the pool, usually from config, in bytes. + fn pool_maximum_size(&self) -> usize { + self.memory_limit + } + + /// Sums the sizes (in bytes) of all buffers currently in use across the pool. + fn pool_current_size(&self) -> usize { + (0..NUM_BUCKETS) + .filter(|&i| self.bucket_current_elements(i) > 0) + .map(|i| self.bucket_current_size(i)) + .sum() + } + /// Sums the sizes (in bytes) of all buffers ever allocated across the pool (historical). + fn pool_allocated_size(&self) -> usize { + let mut size = 0; + for i in 0..NUM_BUCKETS { + size += self.bucket_allocated_size(i); + } + size + } + + /// Returns pool utilization percentage. + fn pool_utilization(&self) -> f64 { + (self.pool_current_size() as f64 / self.pool_maximum_size() as f64) * 100.0 + } + + fn bucket_current_elements(&self, idx: usize) -> usize { + self.in_use[idx].load(Ordering::Acquire) + } + + fn bucket_current_size(&self, idx: usize) -> usize { + self.bucket_current_elements(idx) * BUCKET_SIZES[idx] + } + + fn bucket_allocated_elements(&self, idx: usize) -> usize { + self.allocations[idx].load(Ordering::Acquire) + } + + fn bucket_allocated_size(&self, idx: usize) -> usize { + self.bucket_allocated_elements(idx) * BUCKET_SIZES[idx] + } + + fn bucket_returns(&self, idx: usize) -> usize { + self.returned[idx].load(Ordering::Acquire) + } + + fn resize_events(&self) -> usize { + self.resize_events.load(Ordering::Acquire) + } + + fn dropped_returns(&self) -> usize { + self.dropped_returns.load(Ordering::Acquire) + } + + fn external_allocations(&self) -> usize { + self.external_allocations.load(Ordering::Acquire) + } + + fn external_deallocations(&self) -> usize { + self.external_deallocations.load(Ordering::Acquire) + } + + pub(super) fn inc_resize_events(&self) { + self.resize_events.fetch_add(1, Ordering::Release); + } + + pub(super) fn inc_dropped_returns(&self) { + self.dropped_returns.fetch_add(1, Ordering::Release); + } + + pub(super) fn inc_external_allocations(&self) { + self.external_allocations.fetch_add(1, Ordering::Release); + } + + pub(super) fn inc_external_deallocations(&self) { + self.external_deallocations.fetch_add(1, Ordering::Release); + } + + pub(super) fn inc_bucket_alloc(&self, idx: usize) { + self.allocations[idx].fetch_add(1, Ordering::Release); + } + + pub(super) fn inc_bucket_return(&self, idx: usize) { + self.returned[idx].fetch_add(1, Ordering::Release); + } + + pub(super) fn inc_bucket_in_use(&self, idx: usize) { + self.in_use[idx].fetch_add(1, Ordering::Release); + } + + pub(super) fn dec_bucket_in_use(&self, idx: usize) { + self.in_use[idx].fetch_sub(1, Ordering::Release); + } + + fn should_print_warning(&self) -> bool { + self.capacity_warning.load(Ordering::Acquire) + } + + fn set_capacity_warning(&self, value: bool) { + self.capacity_warning.store(value, Ordering::Release); + } +} + +/// Return a buffer to the pool by calling `release_buffer` with the original capacity. +/// This extension trait makes it easy to do `some_bytes.return_to_pool(orig_cap, was_pool_allocated)`. +pub trait BytesMutExt { + fn return_to_pool(self, original_capacity: usize, was_pool_allocated: bool); +} + +impl BytesMutExt for BytesMut { + fn return_to_pool(self, original_capacity: usize, was_pool_allocated: bool) { + memory_pool().release_buffer(self, original_capacity, was_pool_allocated); + } +} + +/// Convert a size in bytes to a string like "8KiB" or "2MiB". +fn size_str(size: usize) -> String { + if size >= 1024 * 1024 { + format!("{}MiB", size / (1024 * 1024)) + } else if size >= 1024 { + format!("{}KiB", size / 1024) + } else { + format!("{size}B") + } +} + +#[cfg(test)] +mod tests { + use crate::{IggyByteSize, alloc::buffer::PooledBuffer}; + + use super::*; + use std::{str::FromStr, sync::Once}; + + static TEST_INIT: Once = Once::new(); + + fn initialize_pool_for_tests() { + TEST_INIT.call_once(|| { + let config = + MemoryPoolConfig { + enabled: true, + size: IggyByteSize::from_str("4GiB").unwrap(), + bucket_capacity: 8192, + }; + MemoryPool::init_pool(&config); + }); + } + + #[test] + fn test_pooled_buffer_resize_tracking() { + initialize_pool_for_tests(); + + let pool = memory_pool(); + let initial_resize_events = pool.resize_events(); + + let small_bucket_idx = 2; + let small_bucket_size = BUCKET_SIZES[small_bucket_idx]; + + let mut buffer = PooledBuffer::with_capacity(small_bucket_size); + + assert_eq!( + buffer.capacity(), + small_bucket_size, + "Initial capacity should match requested size" + ); + + let original_in_use_before = pool.bucket_current_elements(small_bucket_idx); + + // Force a resize with data significantly larger than current bucket + // Use 256KiB data to ensure it goes to a much larger bucket + let large_bucket_idx = 6; + let large_data_size = BUCKET_SIZES[large_bucket_idx] - 1024; + buffer.put_slice(&vec![0u8; large_data_size]); + + assert!( + buffer.capacity() >= large_data_size, + "Buffer should resize to fit data" + ); + + assert_eq!( + pool.resize_events(), + initial_resize_events + 1, + "Resize event should be recorded" + ); + + assert_eq!( + pool.bucket_current_elements(small_bucket_idx), + original_in_use_before - 1, + "Original bucket in-use count should decrease by 1" + ); + + let new_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); + assert!( + new_bucket_idx > small_bucket_idx, + "Buffer should move to a larger bucket" + ); + + let new_bucket_in_use = pool.bucket_current_elements(new_bucket_idx); + assert!( + new_bucket_in_use > 0, + "New bucket should have buffers in use" + ); + + drop(buffer); + + assert_eq!( + pool.bucket_current_elements(small_bucket_idx), + original_in_use_before - 1, + "Small bucket count should remain decreased after drop" + ); + + assert_eq!( + pool.bucket_current_elements(new_bucket_idx), + new_bucket_in_use - 1, + "New bucket in-use count should decrease after drop" + ); + } + + #[test] + fn test_multiple_resize_operations() { + initialize_pool_for_tests(); + + let pool = memory_pool(); + let initial_resize_events = pool.resize_events(); + + let mut buffer = PooledBuffer::with_capacity(BUCKET_SIZES[1]); // 8KiB + let original_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); + let original_bucket_in_use = pool.bucket_current_elements(original_bucket_idx); + + let first_resize_size = BUCKET_SIZES[4]; // 64KiB + buffer.put_slice(&vec![0u8; first_resize_size]); + + assert!( + buffer.capacity() >= first_resize_size, + "Buffer should increase capacity after first resize" + ); + assert_eq!( + pool.resize_events(), + initial_resize_events + 1, + "One resize event should be recorded" + ); + + let mid_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); + assert!( + mid_bucket_idx > original_bucket_idx, + "Buffer should move to a larger bucket after first resize" + ); + + let mid_bucket_in_use = pool.bucket_current_elements(mid_bucket_idx); + assert_eq!( + pool.bucket_current_elements(original_bucket_idx), + original_bucket_in_use - 1, + "Original bucket count should decrease after first resize" + ); + + let second_resize_size = BUCKET_SIZES[9]; // 1MiB + buffer.put_slice(&vec![0u8; second_resize_size]); + + assert!( + buffer.capacity() >= second_resize_size, + "Buffer should increase capacity after second resize" + ); + assert_eq!( + pool.resize_events(), + initial_resize_events + 2, + "Two resize events should be recorded" + ); + + let final_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); + assert!( + final_bucket_idx > mid_bucket_idx, + "Buffer should move to an even larger bucket after second resize" + ); + + let final_bucket_in_use = pool.bucket_current_elements(final_bucket_idx); + assert_eq!( + pool.bucket_current_elements(mid_bucket_idx), + mid_bucket_in_use - 1, + "Mid bucket count should decrease after second resize" + ); + + drop(buffer); + + assert_eq!( + pool.bucket_current_elements(original_bucket_idx), + original_bucket_in_use - 1, + "Original bucket should remain decreased" + ); + + assert_eq!( + pool.bucket_current_elements(mid_bucket_idx), + mid_bucket_in_use - 1, + "Mid bucket should remain decreased" + ); + + assert_eq!( + pool.bucket_current_elements(final_bucket_idx), + final_bucket_in_use - 1, + "Final bucket count should decrease after drop" + ); + } + + #[test] + fn test_different_resize_methods() { + initialize_pool_for_tests(); + + let pool = memory_pool(); + + // Test put_slice + { + let initial_events = pool.resize_events(); + let mut buffer = PooledBuffer::with_capacity(4 * 1024); + let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); + let orig_in_use = pool.bucket_current_elements(orig_bucket_idx); + + buffer.put_slice(&vec![0u8; 64 * 1024]); + + assert_eq!( + pool.resize_events(), + initial_events + 1, + "put_slice should trigger resize event" + ); + assert_eq!( + pool.bucket_current_elements(orig_bucket_idx), + orig_in_use - 1, + "put_slice should update bucket accounting" + ); + } + + // Test put_bytes + { + let initial_events = pool.resize_events(); + let mut buffer = PooledBuffer::with_capacity(4 * 1024); + let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); + let orig_in_use = pool.bucket_current_elements(orig_bucket_idx); + + buffer.put_bytes(0, 64 * 1024); // 64KiB of zeros + + assert_eq!( + pool.resize_events(), + initial_events + 1, + "put_bytes should trigger resize event" + ); + assert_eq!( + pool.bucket_current_elements(orig_bucket_idx), + orig_in_use - 1, + "put_bytes should update bucket accounting" + ); + } + + // Test extend_from_slice + { + let initial_events = pool.resize_events(); + let mut buffer = PooledBuffer::with_capacity(4 * 1024); + let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); + let orig_in_use = pool.bucket_current_elements(orig_bucket_idx); + + buffer.extend_from_slice(&vec![0u8; 64 * 1024]); + + assert_eq!( + pool.resize_events(), + initial_events + 1, + "extend_from_slice should trigger resize event" + ); + assert_eq!( + pool.bucket_current_elements(orig_bucket_idx), + orig_in_use - 1, + "extend_from_slice should update bucket accounting" + ); + } + + // Test reserve + { + let initial_events = pool.resize_events(); + let mut buffer = PooledBuffer::with_capacity(4 * 1024); + let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); + let orig_in_use = pool.bucket_current_elements(orig_bucket_idx); + + buffer.reserve(64 * 1024); + + if buffer.capacity() > 4 * 1024 { + assert_eq!( + pool.resize_events(), + initial_events + 1, + "reserve should trigger resize event when capacity changes" + ); + assert_eq!( + pool.bucket_current_elements(orig_bucket_idx), + orig_in_use - 1, + "reserve should update bucket accounting when capacity changes" + ); + } + } + } +} diff --git a/core/common/src/alloc/mod.rs b/core/common/src/alloc/mod.rs new file mode 100644 index 000000000..4502a53ce --- /dev/null +++ b/core/common/src/alloc/mod.rs @@ -0,0 +1,21 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +pub(crate) mod buffer; +pub(crate) mod memory_pool; + diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index 032ea4bb9..6ed2265e7 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +mod alloc; mod certificates; mod commands; mod configs; @@ -27,6 +28,7 @@ pub use error::client_error::ClientError; pub use error::iggy_error::{IggyError, IggyErrorDiscriminants}; // Locking is feature gated, thus only mod level re-export. pub mod locking; +pub use alloc::memory_pool::MemoryPoolConfig; pub use certificates::generate_self_signed_certificate; pub use commands::consumer_groups::*; pub use commands::consumer_offsets::*; @@ -87,6 +89,7 @@ pub use types::topic::*; pub use types::user::user_identity_info::*; pub use types::user::user_info::*; pub use types::user::user_status::*; +pub use types::sender::{Sender, SenderKind, TcpSender, TcpTlsSender, QuicSender, WebSocketSender, WebSocketTlsSender}; pub use utils::byte_size::IggyByteSize; pub use utils::checksum::*; pub use utils::crypto::*; diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs index f0d70cf9c..0eb0da82b 100644 --- a/core/common/src/types/mod.rs +++ b/core/common/src/types/mod.rs @@ -34,3 +34,4 @@ pub(crate) mod stats; pub(crate) mod stream; pub(crate) mod topic; pub(crate) mod user; +pub(crate) mod sender; diff --git a/core/common/src/types/sender/mod.rs b/core/common/src/types/sender/mod.rs new file mode 100644 index 000000000..fe8ec7cf2 --- /dev/null +++ b/core/common/src/types/sender/mod.rs @@ -0,0 +1,230 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod quic_sender; +mod tcp_sender; +mod tcp_tls_sender; +mod websocket_sender; +mod websocket_tls_sender; + +pub use quic_sender::QuicSender; +pub use tcp_sender::TcpSender; +pub use tcp_tls_sender::TcpTlsSender; +pub use websocket_sender::WebSocketSender; +pub use websocket_tls_sender::WebSocketTlsSender; + +use crate::IggyError; +use crate::alloc::buffer::PooledBuffer; +use compio::buf::IoBufMut; +use compio::io::{AsyncReadExt, AsyncWriteExt}; +use compio::net::TcpStream; +use compio::BufResult; +use compio_quic::{RecvStream, SendStream}; +use compio_tls::TlsStream; +use std::future::Future; +use tracing::debug; + +macro_rules! forward_async_methods { + ( + $( + async fn $method_name:ident + $(<$($generic:ident $(: $bound:path)?),+>)? + ( + &mut self $(, $arg:ident : $arg_ty:ty )* + ) -> $ret:ty ; + )* + ) => { + $( + pub async fn $method_name + $(<$($generic $(: $bound)?),+>)? + (&mut self, $( $arg: $arg_ty ),* ) -> $ret { + match self { + Self::Tcp(d) => d.$method_name$(::<$($generic),+>)?($( $arg ),*).await, + Self::TcpTls(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await, + Self::Quic(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await, + Self::WebSocket(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await, + Self::WebSocketTls(s) => s.$method_name$(::<$($generic),+>)?($( $arg ),*).await, + } + } + )* + } +} + +pub trait Sender { + fn read<B: IoBufMut>(&mut self, buffer: B) -> impl Future<Output = (Result<(), IggyError>, B)>; + fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), IggyError>>; + fn send_ok_response(&mut self, payload: &[u8]) -> impl Future<Output = Result<(), IggyError>>; + fn send_ok_response_vectored( + &mut self, + length: &[u8], + slices: Vec<PooledBuffer>, + ) -> impl Future<Output = Result<(), IggyError>>; + fn send_error_response( + &mut self, + error: IggyError, + ) -> impl Future<Output = Result<(), IggyError>>; + fn shutdown(&mut self) -> impl Future<Output = Result<(), std::io::Error>>; +} + +#[allow(clippy::large_enum_variant)] +pub enum SenderKind { + Tcp(TcpSender), + TcpTls(TcpTlsSender), + Quic(QuicSender), + WebSocket(WebSocketSender), + WebSocketTls(WebSocketTlsSender), +} + +impl SenderKind { + pub fn get_tcp_sender(stream: TcpStream) -> Self { + Self::Tcp(TcpSender { stream }) + } + + pub fn get_tcp_tls_sender(stream: TlsStream<TcpStream>) -> Self { + Self::TcpTls(TcpTlsSender { stream }) + } + + pub fn get_quic_sender(send_stream: SendStream, recv_stream: RecvStream) -> Self { + Self::Quic(QuicSender { + send: send_stream, + recv: recv_stream, + }) + } + + pub fn get_websocket_sender(stream: WebSocketSender) -> Self { + Self::WebSocket(stream) + } + + pub fn get_websocket_tls_sender(stream: WebSocketTlsSender) -> Self { + Self::WebSocketTls(stream) + } + + forward_async_methods! { + async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(), IggyError>, B); + async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>; + async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), IggyError>; + async fn send_ok_response_vectored(&mut self, length: &[u8], slices: Vec<PooledBuffer>) -> Result<(), IggyError>; + async fn send_error_response(&mut self, error: IggyError) -> Result<(), IggyError>; + async fn shutdown(&mut self) -> Result<(), std::io::Error>; + } +} + +const STATUS_OK: &[u8] = &[0; 4]; + +pub(crate) async fn read<T, B>(stream: &mut T, buffer: B) -> (Result<(), IggyError>, B) +where + T: AsyncReadExt + AsyncWriteExt + Unpin, + B: IoBufMut, +{ + let BufResult(result, buffer) = stream.read_exact(buffer).await; + match (result, buffer) { + (Ok(_), buffer) => (Ok(()), buffer), + (Err(error), buffer) => { + if error.kind() == std::io::ErrorKind::UnexpectedEof { + (Err(IggyError::ConnectionClosed), buffer) + } else { + (Err(IggyError::TcpError), buffer) + } + } + } +} + +pub(crate) async fn send_empty_ok_response<T>(stream: &mut T) -> Result<(), IggyError> +where + T: AsyncReadExt + AsyncWriteExt + Unpin, +{ + send_ok_response(stream, &[]).await +} + +pub(crate) async fn send_ok_response<T>(stream: &mut T, payload: &[u8]) -> Result<(), IggyError> +where + T: AsyncReadExt + AsyncWriteExt + Unpin, +{ + send_response(stream, STATUS_OK, payload).await +} + +pub(crate) async fn send_ok_response_vectored<T>( + stream: &mut T, + length: &[u8], + slices: Vec<PooledBuffer>, +) -> Result<(), IggyError> +where + T: AsyncReadExt + AsyncWriteExt + Unpin, +{ + send_response_vectored(stream, STATUS_OK, length, slices).await +} + +pub(crate) async fn send_error_response<T>( + stream: &mut T, + error: IggyError, +) -> Result<(), IggyError> +where + T: AsyncReadExt + AsyncWriteExt + Unpin, +{ + send_response(stream, &error.as_code().to_le_bytes(), &[]).await +} + +pub(crate) async fn send_response<T>( + stream: &mut T, + status: &[u8], + payload: &[u8], +) -> Result<(), IggyError> +where + T: AsyncReadExt + AsyncWriteExt + Unpin, +{ + debug!( + "Sending response of len: {} with status: {:?}...", + payload.len(), + status + ); + let length = (payload.len() as u32).to_le_bytes(); + stream + .write_all([status, &length, payload].concat()) + .await + .0 + .map_err(|_| IggyError::TcpError)?; + debug!("Sent response with status: {:?}", status); + Ok(()) +} + +pub(crate) async fn send_response_vectored<T>( + stream: &mut T, + status: &[u8], + length: &[u8], + mut slices: Vec<PooledBuffer>, +) -> Result<(), IggyError> +where + T: AsyncReadExt + AsyncWriteExt + Unpin, +{ + let resp_status = u32::from_le_bytes(status.try_into().unwrap()); + debug!( + "Sending vectored response of len: {} with status: {:?}...", + slices.len(), + resp_status + ); + let status = PooledBuffer::from(status); + let length = PooledBuffer::from(length); + slices.splice(0..0, [status, length]); + stream + .write_vectored_all(slices) + .await + .0 + .map_err(|_| IggyError::TcpError)?; + debug!("Sent response with status: {:?}", resp_status); + Ok(()) +} diff --git a/core/common/src/types/sender/quic_sender.rs b/core/common/src/types/sender/quic_sender.rs new file mode 100644 index 000000000..412f608ac --- /dev/null +++ b/core/common/src/types/sender/quic_sender.rs @@ -0,0 +1,142 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::{PooledBuffer, Sender}; +use crate::IggyError; +use compio::buf::IoBufMut; +use compio::io::AsyncReadExt; +use compio::BufResult; +use compio_quic::{RecvStream, SendStream}; +use err_trail::ErrContext; +use tracing::{debug, error}; + +const COMPONENT: &str = "QUIC"; +const STATUS_OK: &[u8] = &[0; 4]; + +#[derive(Debug)] +pub struct QuicSender { + pub(crate) send: SendStream, + pub(crate) recv: RecvStream, +} + +impl Sender for QuicSender { + /// Reads data from the QUIC stream directly into the buffer. + async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(), IggyError>, B) { + let BufResult(result, buffer) = + <RecvStream as AsyncReadExt>::read_exact(&mut self.recv, buffer).await; + match (result, buffer) { + (Ok(_), buffer) => (Ok(()), buffer), + (Err(error), buffer) => { + error!("Failed to read from the stream: {:?}", error); + (Err(IggyError::QuicError), buffer) + } + } + } + + async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> { + self.send_ok_response(&[]).await + } + + async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), IggyError> { + self.send_response(STATUS_OK, payload).await + } + + async fn send_error_response(&mut self, error: IggyError) -> Result<(), IggyError> { + self.send_response(&error.as_code().to_le_bytes(), &[]) + .await + } + + async fn shutdown(&mut self) -> Result<(), std::io::Error> { + Ok(()) + } + + async fn send_ok_response_vectored( + &mut self, + length: &[u8], + slices: Vec<PooledBuffer>, + ) -> Result<(), IggyError> { + debug!("Sending vectored response with status: {:?}...", STATUS_OK); + + let headers = [STATUS_OK, length].concat(); + self.send + .write_all(&headers) + .await + .with_error(|error| { + format!("{COMPONENT} (error: {error}) - failed to write headers to stream") + }) + .map_err(|_| IggyError::QuicError)?; + + let mut total_bytes_written = 0; + + for slice in slices { + let slice_data = &*slice; + if !slice_data.is_empty() { + self.send + .write_all(slice_data) + .await + .with_error(|error| { + format!("{COMPONENT} (error: {error}) - failed to write slice to stream") + }) + .map_err(|_| IggyError::QuicError)?; + + total_bytes_written += slice_data.len(); + } + } + + debug!( + "Sent vectored response: {} bytes of payload", + total_bytes_written + ); + + self.send + .finish() + .with_error(|error| { + format!("{COMPONENT} (error: {error}) - failed to finish send stream") + }) + .map_err(|_| IggyError::QuicError)?; + + debug!("Sent vectored response with status: {:?}", STATUS_OK); + Ok(()) + } +} + +impl QuicSender { + async fn send_response(&mut self, status: &[u8], payload: &[u8]) -> Result<(), IggyError> { + debug!( + "Sending response of len: {} with status: {:?}...", + payload.len(), + status + ); + let length = (payload.len() as u32).to_le_bytes(); + self.send + .write_all(&[status, &length, payload].as_slice().concat()) + .await + .with_error(|error| { + format!("{COMPONENT} (error: {error}) - failed to write buffer to the stream") + }) + .map_err(|_| IggyError::QuicError)?; + self.send + .finish() + .with_error(|error| { + format!("{COMPONENT} (error: {error}) - failed to finish send stream") + }) + .map_err(|_| IggyError::QuicError)?; + debug!("Sent response with status: {:?}", status); + Ok(()) + } +} diff --git a/core/common/src/types/sender/tcp_sender.rs b/core/common/src/types/sender/tcp_sender.rs new file mode 100644 index 000000000..b3596039b --- /dev/null +++ b/core/common/src/types/sender/tcp_sender.rs @@ -0,0 +1,66 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::{PooledBuffer, Sender}; +use crate::IggyError; +use compio::buf::IoBufMut; +use compio::io::AsyncWrite; +use compio::net::TcpStream; +use err_trail::ErrContext; + +const COMPONENT: &str = "TCP"; + +#[derive(Debug)] +pub struct TcpSender { + pub(crate) stream: TcpStream, +} + +impl Sender for TcpSender { + async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(), IggyError>, B) { + super::read(&mut self.stream, buffer).await + } + + async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> { + super::send_empty_ok_response(&mut self.stream).await + } + + async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), IggyError> { + super::send_ok_response(&mut self.stream, payload).await + } + + async fn send_error_response(&mut self, error: IggyError) -> Result<(), IggyError> { + super::send_error_response(&mut self.stream, error).await + } + + async fn shutdown(&mut self) -> Result<(), std::io::Error> { + self.stream + .shutdown() + .await + .with_error(|error| { + format!("{COMPONENT} (error: {error}) - failed to shutdown TCP stream") + }) + } + + async fn send_ok_response_vectored( + &mut self, + length: &[u8], + slices: Vec<PooledBuffer>, + ) -> Result<(), IggyError> { + super::send_ok_response_vectored(&mut self.stream, length, slices).await + } +} diff --git a/core/common/src/types/sender/tcp_tls_sender.rs b/core/common/src/types/sender/tcp_tls_sender.rs new file mode 100644 index 000000000..d2c070a7a --- /dev/null +++ b/core/common/src/types/sender/tcp_tls_sender.rs @@ -0,0 +1,87 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::{PooledBuffer, Sender}; +use crate::IggyError; +use compio::buf::IoBufMut; +use compio::io::AsyncWrite; +use compio::net::TcpStream; +use compio_tls::TlsStream; +use err_trail::ErrContext; + +const COMPONENT: &str = "TCP"; + +#[derive(Debug)] +pub struct TcpTlsSender { + pub(crate) stream: TlsStream<TcpStream>, +} + +impl Sender for TcpTlsSender { + async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(), IggyError>, B) { + super::read(&mut self.stream, buffer).await + } + + async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> { + super::send_empty_ok_response(&mut self.stream).await?; + self.stream + .flush() + .await + .with_error(|e| format!("failed to flush TCP stream after sending response: {e}")) + .map_err(|_| IggyError::TcpError) + } + + async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), IggyError> { + super::send_ok_response(&mut self.stream, payload).await?; + self.stream + .flush() + .await + .with_error(|e| format!("failed to flush TCP stream after sending response: {e}")) + .map_err(|_| IggyError::TcpError) + } + + async fn send_error_response(&mut self, error: IggyError) -> Result<(), IggyError> { + super::send_error_response(&mut self.stream, error).await?; + self.stream + .flush() + .await + .with_error(|e| format!("failed to flush TCP stream after sending response: {e}")) + .map_err(|_| IggyError::TcpError) + } + + async fn shutdown(&mut self) -> Result<(), std::io::Error> { + self.stream + .shutdown() + .await + .with_error(|error| { + format!("{COMPONENT} (error: {error}) - failed to shutdown TCP TLS stream") + }) + } + + async fn send_ok_response_vectored( + &mut self, + length: &[u8], + slices: Vec<PooledBuffer>, + ) -> Result<(), IggyError> { + super::send_ok_response_vectored(&mut self.stream, length, slices).await?; + self.stream + .flush() + .await + .with_error(|e| format!("failed to flush TCP stream after sending response: {e}")) + .map_err(|_| IggyError::TcpError) + } +} diff --git a/core/common/src/types/sender/websocket_sender.rs b/core/common/src/types/sender/websocket_sender.rs new file mode 100644 index 000000000..7804660a7 --- /dev/null +++ b/core/common/src/types/sender/websocket_sender.rs @@ -0,0 +1,208 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::{Sender}; +use crate::IggyError; +use crate::alloc::buffer::PooledBuffer; +use bytes::{BufMut, BytesMut}; +use compio::buf::IoBufMut; +use compio::net::TcpStream; +use compio_ws::TungsteniteError; +use compio_ws::{WebSocketMessage as Message, WebSocketStream}; +use std::ptr; +use tracing::{debug, warn}; + +const READ_BUFFER_CAPACITY: usize = 8192; +const WRITE_BUFFER_CAPACITY: usize = 8192; +const STATUS_OK: &[u8] = &[0; 4]; + +pub struct WebSocketSender { + pub(crate) stream: WebSocketStream<TcpStream>, + pub(crate) read_buffer: BytesMut, + pub(crate) write_buffer: BytesMut, +} + +impl WebSocketSender { + pub fn new(stream: WebSocketStream<TcpStream>) -> Self { + Self { + stream, + read_buffer: BytesMut::with_capacity(READ_BUFFER_CAPACITY), + write_buffer: BytesMut::with_capacity(WRITE_BUFFER_CAPACITY), + } + } + + async fn flush_write_buffer(&mut self) -> Result<(), IggyError> { + if self.write_buffer.is_empty() { + return Ok(()); + } + let data = self.write_buffer.split().freeze(); + debug!("WebSocket sending data: {:?}", data.to_vec()); + + self.stream.send(Message::Binary(data)).await.map_err(|e| { + debug!("WebSocket send error: {:?}", e); + match e { + TungsteniteError::ConnectionClosed | TungsteniteError::AlreadyClosed => { + IggyError::ConnectionClosed + } + TungsteniteError::Io(ref io_err) + if io_err.kind() == std::io::ErrorKind::BrokenPipe => + { + warn!("Broken pipe detected (client closed connection)"); + IggyError::ConnectionClosed + } + _ => IggyError::TcpError, + } + }) + } +} + +impl std::fmt::Debug for WebSocketSender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WebSocketSender").finish() + } +} + +impl Sender for WebSocketSender { + async fn read<B: IoBufMut>(&mut self, mut buffer: B) -> (Result<(), IggyError>, B) { + let required_len = buffer.buf_capacity(); + if required_len == 0 { + return (Ok(()), buffer); + } + + while self.read_buffer.len() < required_len { + match self.stream.read().await { + Ok(Message::Binary(data)) => { + self.read_buffer.extend_from_slice(&data); + } + Ok(Message::Close(_)) => { + return (Err(IggyError::ConnectionClosed), buffer); + } + Ok(Message::Ping(data)) => { + if self.stream.send(Message::Pong(data)).await.is_err() { + return (Err(IggyError::ConnectionClosed), buffer); + } + } + Ok(_) => { /* Ignore other message types */ } + Err(_) => { + return (Err(IggyError::ConnectionClosed), buffer); + } + } + } + + let data_to_copy = self.read_buffer.split_to(required_len); + + unsafe { + ptr::copy_nonoverlapping(data_to_copy.as_ptr(), buffer.as_buf_mut_ptr(), required_len); + buffer.set_buf_init(required_len); + } + + (Ok(()), buffer) + } + + async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> { + self.send_ok_response(&[]).await + } + + async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), IggyError> { + debug!( + "Sending WebSocket response with status: OK, payload length: {}", + payload.len() + ); + + let length = (payload.len() as u32).to_le_bytes(); + let total_size = STATUS_OK.len() + length.len() + payload.len(); + + if self.write_buffer.len() + total_size > self.write_buffer.capacity() { + self.flush_write_buffer().await?; + } + + self.write_buffer.put_slice(STATUS_OK); + self.write_buffer.put_slice(&length); + self.write_buffer.put_slice(payload); + + self.flush_write_buffer().await + } + + async fn send_error_response(&mut self, error: IggyError) -> Result<(), IggyError> { + let status = &error.as_code().to_le_bytes(); + debug!("Sending WebSocket error response with status: {:?}", status); + let length = 0u32.to_le_bytes(); + let total_size = status.len() + length.len(); + + if self.write_buffer.len() + total_size > self.write_buffer.capacity() { + self.flush_write_buffer().await?; + } + self.write_buffer.put_slice(status); + self.write_buffer.put_slice(&length); + self.flush_write_buffer().await + } + + async fn shutdown(&mut self) -> Result<(), std::io::Error> { + self.flush_write_buffer() + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; + + match self.stream.close(None).await { + Ok(_) => Ok(()), + Err(e) => match e { + TungsteniteError::ConnectionClosed | TungsteniteError::AlreadyClosed => { + debug!("WebSocket connection already closed: {}", e); + Ok(()) + } + _ => Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to close WebSocket connection: {}", e), + )), + }, + } + } + + async fn send_ok_response_vectored( + &mut self, + length: &[u8], + slices: Vec<PooledBuffer>, + ) -> Result<(), IggyError> { + self.flush_write_buffer().await?; + + let total_payload_size = slices.iter().map(|s| s.len()).sum::<usize>(); + let total_size = STATUS_OK.len() + length.len() + total_payload_size; + + let mut response_bytes = BytesMut::with_capacity(total_size); + response_bytes.put_slice(STATUS_OK); + response_bytes.put_slice(length); + for slice in slices { + response_bytes.put_slice(&slice); + } + + self.stream + .send(Message::Binary(response_bytes.freeze())) + .await + .map_err(|e| match e { + TungsteniteError::ConnectionClosed | TungsteniteError::AlreadyClosed => { + IggyError::ConnectionClosed + } + TungsteniteError::Io(ref io_err) + if io_err.kind() == std::io::ErrorKind::BrokenPipe => + { + warn!("Broken pipe in vectored send - client closed connection"); + IggyError::ConnectionClosed + } + _ => IggyError::TcpError, + }) + } +} diff --git a/core/common/src/types/sender/websocket_tls_sender.rs b/core/common/src/types/sender/websocket_tls_sender.rs new file mode 100644 index 000000000..0ffc77aba --- /dev/null +++ b/core/common/src/types/sender/websocket_tls_sender.rs @@ -0,0 +1,188 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::Sender; +use crate::IggyError; +use crate::alloc::buffer::PooledBuffer; +use bytes::{BufMut, BytesMut}; +use compio::buf::IoBufMut; +use compio::net::TcpStream; +use compio_tls::TlsStream; +use compio_ws::TungsteniteError; +use compio_ws::{WebSocketMessage as Message, WebSocketStream}; +use std::ptr; +use tracing::debug; + +const READ_BUFFER_CAPACITY: usize = 8192; +const WRITE_BUFFER_CAPACITY: usize = 8192; +const STATUS_OK: &[u8] = &[0; 4]; + +pub struct WebSocketTlsSender { + pub(crate) stream: WebSocketStream<TlsStream<TcpStream>>, + pub(crate) read_buffer: BytesMut, + pub(crate) write_buffer: BytesMut, +} + +impl WebSocketTlsSender { + pub fn new(stream: WebSocketStream<TlsStream<TcpStream>>) -> Self { + Self { + stream, + read_buffer: BytesMut::with_capacity(READ_BUFFER_CAPACITY), + write_buffer: BytesMut::with_capacity(WRITE_BUFFER_CAPACITY), + } + } + + async fn flush_write_buffer(&mut self) -> Result<(), IggyError> { + if self.write_buffer.is_empty() { + return Ok(()); + } + let data = self.write_buffer.split().freeze(); + self.stream + .send(Message::Binary(data)) + .await + .map_err(|_| IggyError::TcpError) + } +} + +impl std::fmt::Debug for WebSocketTlsSender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WebSocketTlsSender").finish() + } +} + +impl Sender for WebSocketTlsSender { + async fn read<B: IoBufMut>(&mut self, mut buffer: B) -> (Result<(), IggyError>, B) { + let required_len = buffer.buf_capacity(); + if required_len == 0 { + return (Ok(()), buffer); + } + + while self.read_buffer.len() < required_len { + match self.stream.read().await { + Ok(Message::Binary(data)) => { + self.read_buffer.extend_from_slice(&data); + } + Ok(Message::Close(_)) => { + return (Err(IggyError::ConnectionClosed), buffer); + } + Ok(Message::Ping(data)) => { + if self.stream.send(Message::Pong(data)).await.is_err() { + return (Err(IggyError::ConnectionClosed), buffer); + } + } + Ok(_) => { /* Ignore other message types */ } + Err(_) => { + return (Err(IggyError::ConnectionClosed), buffer); + } + } + } + + let data_to_copy = self.read_buffer.split_to(required_len); + + unsafe { + ptr::copy_nonoverlapping(data_to_copy.as_ptr(), buffer.as_buf_mut_ptr(), required_len); + buffer.set_buf_init(required_len); + } + + (Ok(()), buffer) + } + + async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> { + self.send_ok_response(&[]).await + } + + async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), IggyError> { + debug!( + "Sending WebSocket TLS response with status: OK, payload length: {}", + payload.len() + ); + + let length = (payload.len() as u32).to_le_bytes(); + let total_size = STATUS_OK.len() + length.len() + payload.len(); + + if self.write_buffer.len() + total_size > self.write_buffer.capacity() { + self.flush_write_buffer().await?; + } + + self.write_buffer.put_slice(STATUS_OK); + self.write_buffer.put_slice(&length); + self.write_buffer.put_slice(payload); + + self.flush_write_buffer().await + } + + async fn send_error_response(&mut self, error: IggyError) -> Result<(), IggyError> { + let status = &error.as_code().to_le_bytes(); + debug!( + "Sending WebSocket TLS error response with status: {:?}", + status + ); + let length = 0u32.to_le_bytes(); + let total_size = status.len() + length.len(); + + if self.write_buffer.len() + total_size > self.write_buffer.capacity() { + self.flush_write_buffer().await?; + } + self.write_buffer.put_slice(status); + self.write_buffer.put_slice(&length); + self.flush_write_buffer().await + } + + async fn shutdown(&mut self) -> Result<(), std::io::Error> { + self.flush_write_buffer() + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; + + match self.stream.close(None).await { + Ok(_) => Ok(()), + Err(e) => match e { + TungsteniteError::ConnectionClosed | TungsteniteError::AlreadyClosed => { + debug!("WebSocket TLS connection already closed: {}", e); + Ok(()) + } + _ => Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to close WebSocket TLS connection: {}", e), + )), + }, + } + } + + async fn send_ok_response_vectored( + &mut self, + length: &[u8], + slices: Vec<PooledBuffer>, + ) -> Result<(), IggyError> { + self.flush_write_buffer().await?; + + let total_payload_size = slices.iter().map(|s| s.len()).sum::<usize>(); + let total_size = STATUS_OK.len() + length.len() + total_payload_size; + + let mut response_bytes = BytesMut::with_capacity(total_size); + response_bytes.put_slice(STATUS_OK); + response_bytes.put_slice(length); + for slice in slices { + response_bytes.put_slice(&slice); + } + + self.stream + .send(Message::Binary(response_bytes.freeze())) + .await + .map_err(|_| IggyError::TcpError) + } +} diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs index b6c4b5de2..f40fc72d4 100644 --- a/core/message_bus/src/lib.rs +++ b/core/message_bus/src/lib.rs @@ -17,6 +17,9 @@ pub trait MessageBus {} -pub struct IggyMessageBus; +pub struct IggyMessageBus { + client_connections: HashMap<u128, ClientConnection> + +} impl MessageBus for IggyMessageBus {} diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 6fa817948..7a2503b22 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -63,8 +63,8 @@ dashmap = { workspace = true } derive_more = { workspace = true } dotenvy = { workspace = true } enum_dispatch = { workspace = true } -err_trail = { version = "0.10.2" } -error_set = { version = "0.9.0" } +err_trail = { workspace = true } +error_set = { workspace = true } figlet-rs = { workspace = true } figment = { workspace = true } flume = { workspace = true } diff --git a/core/server/src/configs/defaults.rs b/core/server/src/configs/defaults.rs index 9849dc8fc..2b205252c 100644 --- a/core/server/src/configs/defaults.rs +++ b/core/server/src/configs/defaults.rs @@ -17,7 +17,6 @@ */ use super::sharding::ShardingConfig; -use super::system::MemoryPoolConfig; use super::tcp::TcpSocketConfig; use crate::configs::cluster::CurrentNodeConfig; use crate::configs::cluster::{ClusterConfig, NodeConfig, OtherNodeConfig, TransportPorts}; @@ -37,7 +36,7 @@ use crate::configs::system::{ }; use crate::configs::tcp::{TcpConfig, TcpTlsConfig}; use crate::configs::websocket::{WebSocketConfig, WebSocketTlsConfig}; -use iggy_common::IggyByteSize; +use iggy_common::{IggyByteSize, MemoryPoolConfig}; use iggy_common::IggyDuration; use std::sync::Arc; use std::time::Duration; diff --git a/core/server/src/configs/system.rs b/core/server/src/configs/system.rs index fae172ca8..d0060818d 100644 --- a/core/server/src/configs/system.rs +++ b/core/server/src/configs/system.rs @@ -135,13 +135,6 @@ pub struct RecoveryConfig { pub recreate_missing_state: bool, } -#[derive(Debug, Deserialize, Serialize)] -pub struct MemoryPoolConfig { - pub enabled: bool, - pub size: IggyByteSize, - pub bucket_capacity: u32, -} - #[serde_as] #[derive(Debug, Deserialize, Serialize)] pub struct SegmentConfig {
