This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit cbbd351765c0fe5e9bc1ee81885a9365edcb1d35
Author: numinex <[email protected]>
AuthorDate: Fri Jun 20 10:52:31 2025 +0200

    refactors
---
 Cargo.lock                                         | 13 +++++
 Cargo.toml                                         |  1 +
 core/server/Cargo.toml                             |  1 +
 core/server/src/bootstrap.rs                       |  7 +--
 core/server/src/main.rs                            | 11 ++--
 core/server/src/shard/builder.rs                   |  8 ++-
 core/server/src/shard/frame.rs                     | 67 ----------------------
 core/server/src/shard/mod.rs                       | 17 +++---
 .../src/shard/{ => transmission}/connector.rs      | 49 ++++++++++++----
 core/server/src/shard/transmission/frame.rs        | 42 ++++++++++++++
 core/server/src/shard/transmission/message.rs      | 22 +++++++
 core/server/src/shard/transmission/mod.rs          |  3 +
 12 files changed, 142 insertions(+), 99 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index f165ff62..54f19c07 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -466,6 +466,18 @@ dependencies = [
  "pin-project-lite",
 ]
 
+[[package]]
+name = "async-channel"
+version = "2.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a"
+dependencies = [
+ "concurrent-queue",
+ "event-listener-strategy",
+ "futures-core",
+ "pin-project-lite",
+]
+
 [[package]]
 name = "async-compression"
 version = "0.4.25"
@@ -6876,6 +6888,7 @@ version = "0.5.0"
 dependencies = [
  "ahash 0.8.12",
  "anyhow",
+ "async-channel",
  "async_zip",
  "axum 0.8.4",
  "axum-server",
diff --git a/Cargo.toml b/Cargo.toml
index 0bda2c80..70629e4c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -89,6 +89,7 @@ dlopen2 = "0.8.0"
 enum_dispatch = "0.3.13"
 figlet-rs = "0.1.5"
 flume = "0.11.1"
+async-channel = "2.3.1"
 futures = "0.3.31"
 futures-util = "0.3.31"
 human-repr = "1.1.0"
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 26ce895d..e612af10 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -56,6 +56,7 @@ error_set = { version = "0.8.5", features = ["tracing"] }
 figlet-rs = { workspace = true }
 figment = { version = "0.10.19", features = ["toml", "env"] }
 flume = { workspace = true }
+async-channel = { workspace = true }
 futures = { workspace = true }
 human-repr = { workspace = true }
 iggy_common = { workspace = true }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index d70d0317..c6108568 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -5,14 +5,14 @@ use iggy_common::{
         MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH,
     },
 };
-use monoio::{fs::create_dir_all, time::TimeDriver, Buildable, Driver, Runtime};
+use monoio::{Buildable, Driver, Runtime, fs::create_dir_all, time::TimeDriver};
 use tracing::info;
 
 use crate::{
     IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
     configs::{config_provider::ConfigProviderKind, server::ServerConfig, 
system::SystemConfig},
     server_error::ServerError,
-    shard::{connector::ShardConnector, frame::ShardFrame},
+    shard::{transmission::connector::ShardConnector, 
transmission::frame::ShardFrame},
     streaming::users::user::User,
 };
 use std::{env, fs::remove_dir_all, ops::Range, path::Path};
@@ -146,8 +146,7 @@ where
     rt
 }
 
-pub fn create_shard_executor() -> Runtime<TimeDriver<monoio::IoUringDriver>>
-{
+pub fn create_shard_executor() -> Runtime<TimeDriver<monoio::IoUringDriver>> {
     // TODO: Figure out what else we could tweak there
     // We for sure want to disable the userspace interrupts on new cq entry 
(set_coop_taskrun)
     // let urb = io_uring::IoUring::builder();
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index e7409e49..67728862 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -25,7 +25,8 @@ use error_set::ErrContext;
 use figlet_rs::FIGfont;
 use server::args::Args;
 use server::bootstrap::{
-    create_default_executor, create_directories, create_root_user, 
create_shard_connections, create_shard_executor, load_config
+    create_default_executor, create_directories, create_root_user, 
create_shard_connections,
+    create_shard_executor, load_config,
 };
 use server::channels::commands::archive_state::ArchiveStateExecutor;
 use 
server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor;
@@ -126,14 +127,10 @@ fn main() -> Result<(), ServerError> {
 
     // TODO: Make this configurable from config as a range
     // for example this instance of Iggy will use cores from 0..4
-    let available_cpus = available_parallelism()
-        .expect("Failed to get num of cores")
-        .into();
-    let shards_count = available_cpus;
+    let available_cpus = available_parallelism().expect("Failed to get num of 
cores");
+    let shards_count = available_cpus.into();
     let shards_set = 0..shards_count;
-
     let connections = create_shard_connections(shards_set.clone());
-
     for shard_id in shards_set {
         let id = shard_id as u16;
         let connections = connections.clone();
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 06366ebf..36fe0e54 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -33,7 +33,7 @@ use crate::{
     versioning::SemanticVersion,
 };
 
-use super::{IggyShard, connector::ShardConnector, frame::ShardFrame};
+use super::{IggyShard, transmission::connector::ShardConnector, 
transmission::frame::ShardFrame};
 
 #[derive(Default)]
 pub struct IggyShardBuilder {
@@ -76,6 +76,7 @@ impl IggyShardBuilder {
             .next()
             .expect("Failed to find connection with the specified ID");
         let shards = connections.into_iter().map(Shard::new).collect();
+
         //TODO: This can be discrete step in the builder bootstrapped from 
main function.
         let version = SemanticVersion::current().expect("Invalid version");
 
@@ -102,7 +103,10 @@ impl IggyShardBuilder {
 
         //TODO: This can be discrete step in the builder bootstrapped from 
main function.
         let partition_persister = 
Self::resolve_persister(config.system.partition.enforce_fsync);
-        let storage = Rc::new(SystemStorage::new(config.system.clone(), 
partition_persister));
+        let storage = Rc::new(SystemStorage::new(
+            config.system.clone(),
+            partition_persister,
+        ));
 
         IggyShard {
             id: id,
diff --git a/core/server/src/shard/frame.rs b/core/server/src/shard/frame.rs
deleted file mode 100644
index a2c1220f..00000000
--- a/core/server/src/shard/frame.rs
+++ /dev/null
@@ -1,67 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-use bytes::Bytes;
-use flume::Sender;
-use iggy_common::IggyError;
-
-#[derive(Debug, Clone)]
-pub enum ShardMessage {
-    //TODO: Fixme
-    //Command(ServerCommand),
-    Event(ShardEvent),
-}
-
-#[derive(Debug, Clone)]
-pub enum ShardEvent {}
-
-#[derive(Debug)]
-pub enum ShardResponse {
-    BinaryResponse(Bytes),
-    ErrorResponse(IggyError),
-}
-
-#[derive(Debug, Clone)]
-pub struct ShardFrame {
-    pub client_id: u32,
-    pub message: ShardMessage,
-    pub response_sender: Option<Sender<ShardResponse>>,
-}
-
-impl ShardFrame {
-    pub fn new(
-        client_id: u32,
-        message: ShardMessage,
-        response_sender: Option<Sender<ShardResponse>>,
-    ) -> Self {
-        Self {
-            client_id,
-            message,
-            response_sender,
-        }
-    }
-}
-
-#[macro_export]
-macro_rules! handle_response {
-    ($sender:expr, $response:expr) => {
-        match $response {
-            ShardResponse::BinaryResponse(payload) => 
$sender.send_ok_response(&payload).await?,
-            ShardResponse::ErrorResponse(err) => 
$sender.send_error_response(err).await?,
-        }
-    };
-}
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index b98f6381..6a6d35b8 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -17,25 +17,27 @@
  */
 
 pub mod builder;
-pub mod connector;
-pub mod frame;
 pub mod namespace;
+pub mod transmission;
 
 use ahash::HashMap;
 use builder::IggyShardBuilder;
-use connector::{Receiver, ShardConnector, StopReceiver, StopSender};
-use frame::ShardFrame;
 use iggy_common::IggyError;
 use namespace::IggyNamespace;
-use tracing::info;
 use std::{
     cell::{Cell, RefCell},
     rc::Rc,
-    sync::Arc, time::Instant,
+    sync::Arc,
+    time::Instant,
 };
+use tracing::info;
+use transmission::connector::{Receiver, ShardConnector, StopReceiver, 
StopSender};
 
 use crate::{
-    bootstrap::create_root_user, configs::server::ServerConfig, 
state::{file::FileState, StateKind},
+    bootstrap::create_root_user,
+    configs::server::ServerConfig,
+    shard::transmission::frame::ShardFrame,
+    state::{StateKind, file::FileState},
     streaming::storage::SystemStorage,
 };
 pub(crate) struct Shard {
@@ -68,7 +70,6 @@ pub struct IggyShard {
     // TODO: Refactor.
     pub(crate) storage: Rc<SystemStorage>,
 
-    // TODO - get rid of this dynamic dispatch.
     pub(crate) state: Rc<StateKind>,
     //pub(crate) encryptor: Option<Rc<dyn Encryptor>>,
     config: ServerConfig,
diff --git a/core/server/src/shard/connector.rs 
b/core/server/src/shard/transmission/connector.rs
similarity index 74%
rename from core/server/src/shard/connector.rs
rename to core/server/src/shard/transmission/connector.rs
index 95aedc73..01ca4147 100644
--- a/core/server/src/shard/connector.rs
+++ b/core/server/src/shard/transmission/connector.rs
@@ -22,10 +22,9 @@ use std::{
     task::Poll,
 };
 
-pub type StopSender = flume::Sender<()>;
-pub type StopReceiver = flume::Receiver<()>;
+pub type StopSender = async_channel::Sender<()>;
+pub type StopReceiver = async_channel::Receiver<()>;
 
-#[derive(Clone)]
 pub struct ShardConnector<T> {
     pub id: u16,
     pub sender: Sender<T>,
@@ -34,12 +33,25 @@ pub struct ShardConnector<T> {
     pub stop_sender: StopSender,
 }
 
-// TODO(numinex) - replace flume with async_channel
-impl<T: Clone> ShardConnector<T> {
+impl<T> Clone for ShardConnector<T> {
+    fn clone(&self) -> Self {
+        Self {
+            id: self.id,
+            sender: self.sender.clone(),
+            receiver: self.receiver.clone(),
+            stop_receiver: self.stop_receiver.clone(),
+            stop_sender: self.stop_sender.clone(),
+        }
+    }
+}
+
+// TODO(numinex) - replace async_channel with some other form of one shot 
async channel.
+// !!!!!IMPORTANT!!!! the one shot channel Sender/Receiver has to be 
Cloneable!!!!!
+impl<T> ShardConnector<T> {
     pub fn new(id: u16, max_concurrent_thread_count: usize) -> Self {
         let channel = 
Arc::new(ShardedChannel::new(max_concurrent_thread_count));
         let (sender, receiver) = channel.unbounded();
-        let (stop_sender, stop_receiver) = flume::bounded(1);
+        let (stop_sender, stop_receiver) = async_channel::bounded(1);
         Self {
             id,
             receiver,
@@ -55,17 +67,32 @@ impl<T: Clone> ShardConnector<T> {
 }
 
 // TODO: I think those Arcs can be replaced with 'static lifetimes...
-// Those shards will live for the entire duration of the application.
-#[derive(Clone)]
+// Those connectors will live for the duration of the shard itself
+// and the shard lives for the duration of the entire application.
 pub struct Receiver<T> {
     channel: Arc<ShardedChannel<T>>,
 }
 
-#[derive(Clone)]
+impl<T> Clone for Receiver<T> {
+    fn clone(&self) -> Self {
+        Self {
+            channel: self.channel.clone(),
+        }
+    }
+}
+
 pub struct Sender<T> {
     channel: Arc<ShardedChannel<T>>,
 }
 
+impl<T> Clone for Sender<T> {
+    fn clone(&self) -> Self {
+        Self {
+            channel: self.channel.clone(),
+        }
+    }
+}
+
 impl<T> Sender<T> {
     pub fn send(&self, data: T) {
         self.channel
@@ -94,13 +121,13 @@ impl<T> ShardedChannel<T> {
     }
 }
 
-pub trait ShardedChannelsSplit<T: Clone> {
+pub trait ShardedChannelsSplit<T> {
     fn unbounded(&self) -> (Sender<T>, Receiver<T>);
 
     fn sender(&self) -> Sender<T>;
 }
 
-impl<T: Clone> ShardedChannelsSplit<T> for Arc<ShardedChannel<T>> {
+impl<T> ShardedChannelsSplit<T> for Arc<ShardedChannel<T>> {
     fn unbounded(&self) -> (Sender<T>, Receiver<T>) {
         let tx = self.sender();
         let rx = Receiver {
diff --git a/core/server/src/shard/transmission/frame.rs 
b/core/server/src/shard/transmission/frame.rs
new file mode 100644
index 00000000..ac60863d
--- /dev/null
+++ b/core/server/src/shard/transmission/frame.rs
@@ -0,0 +1,42 @@
+use async_channel::Sender;
+use bytes::Bytes;
+use iggy_common::IggyError;
+
+use crate::shard::transmission::message::ShardMessage;
+
+#[derive(Debug)]
+pub struct ShardFrame {
+    pub client_id: u32,
+    pub message: ShardMessage,
+    pub response_sender: Option<Sender<ShardResponse>>,
+}
+
+impl ShardFrame {
+    pub fn new(
+        client_id: u32,
+        message: ShardMessage,
+        response_sender: Option<Sender<ShardResponse>>,
+    ) -> Self {
+        Self {
+            client_id,
+            message,
+            response_sender,
+        }
+    }
+}
+
+#[derive(Debug)]
+pub enum ShardResponse {
+    BinaryResponse(Bytes),
+    ErrorResponse(IggyError),
+}
+
+#[macro_export]
+macro_rules! handle_response {
+    ($sender:expr, $response:expr) => {
+        match $response {
+            ShardResponse::BinaryResponse(payload) => 
$sender.send_ok_response(&payload).await?,
+            ShardResponse::ErrorResponse(err) => 
$sender.send_error_response(err).await?,
+        }
+    };
+}
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
new file mode 100644
index 00000000..e9f1dd4b
--- /dev/null
+++ b/core/server/src/shard/transmission/message.rs
@@ -0,0 +1,22 @@
+use crate::binary::command::ServerCommand;
+
+#[derive(Debug)]
+pub enum ShardMessage {
+    Command(ServerCommand),
+    Event(ShardEvent),
+}
+
+#[derive(Debug)]
+pub enum ShardEvent {}
+
+impl From<ServerCommand> for ShardMessage {
+    fn from(command: ServerCommand) -> Self {
+        ShardMessage::Command(command)
+    }
+}
+
+impl From<ShardEvent> for ShardMessage {
+    fn from(event: ShardEvent) -> Self {
+        ShardMessage::Event(event)
+    }
+}
diff --git a/core/server/src/shard/transmission/mod.rs 
b/core/server/src/shard/transmission/mod.rs
new file mode 100644
index 00000000..b871152a
--- /dev/null
+++ b/core/server/src/shard/transmission/mod.rs
@@ -0,0 +1,3 @@
+pub mod connector;
+pub mod frame;
+pub mod message;

Reply via email to