This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_tpc in repository https://gitbox.apache.org/repos/asf/iggy.git
commit cbbd351765c0fe5e9bc1ee81885a9365edcb1d35 Author: numinex <[email protected]> AuthorDate: Fri Jun 20 10:52:31 2025 +0200 refactors --- Cargo.lock | 13 +++++ Cargo.toml | 1 + core/server/Cargo.toml | 1 + core/server/src/bootstrap.rs | 7 +-- core/server/src/main.rs | 11 ++-- core/server/src/shard/builder.rs | 8 ++- core/server/src/shard/frame.rs | 67 ---------------------- core/server/src/shard/mod.rs | 17 +++--- .../src/shard/{ => transmission}/connector.rs | 49 ++++++++++++---- core/server/src/shard/transmission/frame.rs | 42 ++++++++++++++ core/server/src/shard/transmission/message.rs | 22 +++++++ core/server/src/shard/transmission/mod.rs | 3 + 12 files changed, 142 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f165ff62..54f19c07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -466,6 +466,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.25" @@ -6876,6 +6888,7 @@ version = "0.5.0" dependencies = [ "ahash 0.8.12", "anyhow", + "async-channel", "async_zip", "axum 0.8.4", "axum-server", diff --git a/Cargo.toml b/Cargo.toml index 0bda2c80..70629e4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,6 +89,7 @@ dlopen2 = "0.8.0" enum_dispatch = "0.3.13" figlet-rs = "0.1.5" flume = "0.11.1" +async-channel = "2.3.1" futures = "0.3.31" futures-util = "0.3.31" human-repr = "1.1.0" diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 26ce895d..e612af10 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -56,6 +56,7 @@ error_set = { version = "0.8.5", features = ["tracing"] } figlet-rs = { workspace = true } figment = { version = "0.10.19", features = ["toml", "env"] } flume = { workspace = true } +async-channel = { workspace = true } futures = { workspace = true } human-repr = { workspace = true } iggy_common = { workspace = true } diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index d70d0317..c6108568 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -5,14 +5,14 @@ use iggy_common::{ MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH, }, }; -use monoio::{fs::create_dir_all, time::TimeDriver, Buildable, Driver, Runtime}; +use monoio::{Buildable, Driver, Runtime, fs::create_dir_all, time::TimeDriver}; use tracing::info; use crate::{ IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, configs::{config_provider::ConfigProviderKind, server::ServerConfig, system::SystemConfig}, server_error::ServerError, - shard::{connector::ShardConnector, frame::ShardFrame}, + shard::{transmission::connector::ShardConnector, transmission::frame::ShardFrame}, streaming::users::user::User, }; use std::{env, fs::remove_dir_all, ops::Range, path::Path}; @@ -146,8 +146,7 @@ where rt } -pub fn create_shard_executor() -> Runtime<TimeDriver<monoio::IoUringDriver>> -{ +pub fn create_shard_executor() -> Runtime<TimeDriver<monoio::IoUringDriver>> { // TODO: Figure out what else we could tweak there // We for sure want to disable the userspace interrupts on new cq entry (set_coop_taskrun) // let urb = io_uring::IoUring::builder(); diff --git a/core/server/src/main.rs b/core/server/src/main.rs index e7409e49..67728862 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -25,7 +25,8 @@ use error_set::ErrContext; use figlet_rs::FIGfont; use server::args::Args; use server::bootstrap::{ - create_default_executor, create_directories, create_root_user, create_shard_connections, create_shard_executor, load_config + create_default_executor, create_directories, create_root_user, create_shard_connections, + create_shard_executor, load_config, }; use server::channels::commands::archive_state::ArchiveStateExecutor; use server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor; @@ -126,14 +127,10 @@ fn main() -> Result<(), ServerError> { // TODO: Make this configurable from config as a range // for example this instance of Iggy will use cores from 0..4 - let available_cpus = available_parallelism() - .expect("Failed to get num of cores") - .into(); - let shards_count = available_cpus; + let available_cpus = available_parallelism().expect("Failed to get num of cores"); + let shards_count = available_cpus.into(); let shards_set = 0..shards_count; - let connections = create_shard_connections(shards_set.clone()); - for shard_id in shards_set { let id = shard_id as u16; let connections = connections.clone(); diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 06366ebf..36fe0e54 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -33,7 +33,7 @@ use crate::{ versioning::SemanticVersion, }; -use super::{IggyShard, connector::ShardConnector, frame::ShardFrame}; +use super::{IggyShard, transmission::connector::ShardConnector, transmission::frame::ShardFrame}; #[derive(Default)] pub struct IggyShardBuilder { @@ -76,6 +76,7 @@ impl IggyShardBuilder { .next() .expect("Failed to find connection with the specified ID"); let shards = connections.into_iter().map(Shard::new).collect(); + //TODO: This can be discrete step in the builder bootstrapped from main function. let version = SemanticVersion::current().expect("Invalid version"); @@ -102,7 +103,10 @@ impl IggyShardBuilder { //TODO: This can be discrete step in the builder bootstrapped from main function. let partition_persister = Self::resolve_persister(config.system.partition.enforce_fsync); - let storage = Rc::new(SystemStorage::new(config.system.clone(), partition_persister)); + let storage = Rc::new(SystemStorage::new( + config.system.clone(), + partition_persister, + )); IggyShard { id: id, diff --git a/core/server/src/shard/frame.rs b/core/server/src/shard/frame.rs deleted file mode 100644 index a2c1220f..00000000 --- a/core/server/src/shard/frame.rs +++ /dev/null @@ -1,67 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -use bytes::Bytes; -use flume::Sender; -use iggy_common::IggyError; - -#[derive(Debug, Clone)] -pub enum ShardMessage { - //TODO: Fixme - //Command(ServerCommand), - Event(ShardEvent), -} - -#[derive(Debug, Clone)] -pub enum ShardEvent {} - -#[derive(Debug)] -pub enum ShardResponse { - BinaryResponse(Bytes), - ErrorResponse(IggyError), -} - -#[derive(Debug, Clone)] -pub struct ShardFrame { - pub client_id: u32, - pub message: ShardMessage, - pub response_sender: Option<Sender<ShardResponse>>, -} - -impl ShardFrame { - pub fn new( - client_id: u32, - message: ShardMessage, - response_sender: Option<Sender<ShardResponse>>, - ) -> Self { - Self { - client_id, - message, - response_sender, - } - } -} - -#[macro_export] -macro_rules! handle_response { - ($sender:expr, $response:expr) => { - match $response { - ShardResponse::BinaryResponse(payload) => $sender.send_ok_response(&payload).await?, - ShardResponse::ErrorResponse(err) => $sender.send_error_response(err).await?, - } - }; -} diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index b98f6381..6a6d35b8 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -17,25 +17,27 @@ */ pub mod builder; -pub mod connector; -pub mod frame; pub mod namespace; +pub mod transmission; use ahash::HashMap; use builder::IggyShardBuilder; -use connector::{Receiver, ShardConnector, StopReceiver, StopSender}; -use frame::ShardFrame; use iggy_common::IggyError; use namespace::IggyNamespace; -use tracing::info; use std::{ cell::{Cell, RefCell}, rc::Rc, - sync::Arc, time::Instant, + sync::Arc, + time::Instant, }; +use tracing::info; +use transmission::connector::{Receiver, ShardConnector, StopReceiver, StopSender}; use crate::{ - bootstrap::create_root_user, configs::server::ServerConfig, state::{file::FileState, StateKind}, + bootstrap::create_root_user, + configs::server::ServerConfig, + shard::transmission::frame::ShardFrame, + state::{StateKind, file::FileState}, streaming::storage::SystemStorage, }; pub(crate) struct Shard { @@ -68,7 +70,6 @@ pub struct IggyShard { // TODO: Refactor. pub(crate) storage: Rc<SystemStorage>, - // TODO - get rid of this dynamic dispatch. pub(crate) state: Rc<StateKind>, //pub(crate) encryptor: Option<Rc<dyn Encryptor>>, config: ServerConfig, diff --git a/core/server/src/shard/connector.rs b/core/server/src/shard/transmission/connector.rs similarity index 74% rename from core/server/src/shard/connector.rs rename to core/server/src/shard/transmission/connector.rs index 95aedc73..01ca4147 100644 --- a/core/server/src/shard/connector.rs +++ b/core/server/src/shard/transmission/connector.rs @@ -22,10 +22,9 @@ use std::{ task::Poll, }; -pub type StopSender = flume::Sender<()>; -pub type StopReceiver = flume::Receiver<()>; +pub type StopSender = async_channel::Sender<()>; +pub type StopReceiver = async_channel::Receiver<()>; -#[derive(Clone)] pub struct ShardConnector<T> { pub id: u16, pub sender: Sender<T>, @@ -34,12 +33,25 @@ pub struct ShardConnector<T> { pub stop_sender: StopSender, } -// TODO(numinex) - replace flume with async_channel -impl<T: Clone> ShardConnector<T> { +impl<T> Clone for ShardConnector<T> { + fn clone(&self) -> Self { + Self { + id: self.id, + sender: self.sender.clone(), + receiver: self.receiver.clone(), + stop_receiver: self.stop_receiver.clone(), + stop_sender: self.stop_sender.clone(), + } + } +} + +// TODO(numinex) - replace async_channel with some other form of one shot async channel. +// !!!!!IMPORTANT!!!! the one shot channel Sender/Receiver has to be Cloneable!!!!! +impl<T> ShardConnector<T> { pub fn new(id: u16, max_concurrent_thread_count: usize) -> Self { let channel = Arc::new(ShardedChannel::new(max_concurrent_thread_count)); let (sender, receiver) = channel.unbounded(); - let (stop_sender, stop_receiver) = flume::bounded(1); + let (stop_sender, stop_receiver) = async_channel::bounded(1); Self { id, receiver, @@ -55,17 +67,32 @@ impl<T: Clone> ShardConnector<T> { } // TODO: I think those Arcs can be replaced with 'static lifetimes... -// Those shards will live for the entire duration of the application. -#[derive(Clone)] +// Those connectors will live for the duration of the shard itself +// and the shard lives for the duration of the entire application. pub struct Receiver<T> { channel: Arc<ShardedChannel<T>>, } -#[derive(Clone)] +impl<T> Clone for Receiver<T> { + fn clone(&self) -> Self { + Self { + channel: self.channel.clone(), + } + } +} + pub struct Sender<T> { channel: Arc<ShardedChannel<T>>, } +impl<T> Clone for Sender<T> { + fn clone(&self) -> Self { + Self { + channel: self.channel.clone(), + } + } +} + impl<T> Sender<T> { pub fn send(&self, data: T) { self.channel @@ -94,13 +121,13 @@ impl<T> ShardedChannel<T> { } } -pub trait ShardedChannelsSplit<T: Clone> { +pub trait ShardedChannelsSplit<T> { fn unbounded(&self) -> (Sender<T>, Receiver<T>); fn sender(&self) -> Sender<T>; } -impl<T: Clone> ShardedChannelsSplit<T> for Arc<ShardedChannel<T>> { +impl<T> ShardedChannelsSplit<T> for Arc<ShardedChannel<T>> { fn unbounded(&self) -> (Sender<T>, Receiver<T>) { let tx = self.sender(); let rx = Receiver { diff --git a/core/server/src/shard/transmission/frame.rs b/core/server/src/shard/transmission/frame.rs new file mode 100644 index 00000000..ac60863d --- /dev/null +++ b/core/server/src/shard/transmission/frame.rs @@ -0,0 +1,42 @@ +use async_channel::Sender; +use bytes::Bytes; +use iggy_common::IggyError; + +use crate::shard::transmission::message::ShardMessage; + +#[derive(Debug)] +pub struct ShardFrame { + pub client_id: u32, + pub message: ShardMessage, + pub response_sender: Option<Sender<ShardResponse>>, +} + +impl ShardFrame { + pub fn new( + client_id: u32, + message: ShardMessage, + response_sender: Option<Sender<ShardResponse>>, + ) -> Self { + Self { + client_id, + message, + response_sender, + } + } +} + +#[derive(Debug)] +pub enum ShardResponse { + BinaryResponse(Bytes), + ErrorResponse(IggyError), +} + +#[macro_export] +macro_rules! handle_response { + ($sender:expr, $response:expr) => { + match $response { + ShardResponse::BinaryResponse(payload) => $sender.send_ok_response(&payload).await?, + ShardResponse::ErrorResponse(err) => $sender.send_error_response(err).await?, + } + }; +} diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs new file mode 100644 index 00000000..e9f1dd4b --- /dev/null +++ b/core/server/src/shard/transmission/message.rs @@ -0,0 +1,22 @@ +use crate::binary::command::ServerCommand; + +#[derive(Debug)] +pub enum ShardMessage { + Command(ServerCommand), + Event(ShardEvent), +} + +#[derive(Debug)] +pub enum ShardEvent {} + +impl From<ServerCommand> for ShardMessage { + fn from(command: ServerCommand) -> Self { + ShardMessage::Command(command) + } +} + +impl From<ShardEvent> for ShardMessage { + fn from(event: ShardEvent) -> Self { + ShardMessage::Event(event) + } +} diff --git a/core/server/src/shard/transmission/mod.rs b/core/server/src/shard/transmission/mod.rs new file mode 100644 index 00000000..b871152a --- /dev/null +++ b/core/server/src/shard/transmission/mod.rs @@ -0,0 +1,3 @@ +pub mod connector; +pub mod frame; +pub mod message;
