This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 77cce6ee feat(io_uring): remove old storage from the shard (#2214)
77cce6ee is described below

commit 77cce6ee31f649a2fae25dded9d226860d90644e
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Sat Sep 27 12:01:31 2025 +0200

    feat(io_uring): remove old storage from the shard (#2214)
---
 core/server/src/http/http_server.rs  |  9 ++++-----
 core/server/src/main.rs              |  5 ++---
 core/server/src/shard/builder.rs     |  9 ---------
 core/server/src/shard/mod.rs         | 20 ++++----------------
 core/server/src/streaming/storage.rs |  1 +
 5 files changed, 11 insertions(+), 33 deletions(-)

diff --git a/core/server/src/http/http_server.rs 
b/core/server/src/http/http_server.rs
index e6441c99..2ae46271 100644
--- a/core/server/src/http/http_server.rs
+++ b/core/server/src/http/http_server.rs
@@ -26,6 +26,7 @@ use crate::http::metrics::metrics;
 use crate::http::shared::AppState;
 use crate::http::*;
 use crate::shard::IggyShard;
+use crate::streaming::persistence::persister::PersisterKind;
 // use crate::streaming::systems::system::SharedSystem;
 use axum::extract::DefaultBodyLimit;
 use axum::extract::connect_info::Connected;
@@ -65,7 +66,7 @@ impl<'a> Connected<cyper_axum::IncomingStream<'a, 
TcpListener>> for CompioSocket
 
 /// Starts the HTTP API server.
 /// Returns the address the server is listening on.
-pub async fn start(config: HttpConfig, shard: Rc<IggyShard>) -> Result<(), 
IggyError> {
+pub async fn start(config: HttpConfig, persister: Arc<PersisterKind>, shard: 
Rc<IggyShard>) -> Result<(), IggyError> {
     if shard.id != 0 {
         info!(
             "HTTP server disabled for shard {} (only runs on shard 0)",
@@ -80,7 +81,7 @@ pub async fn start(config: HttpConfig, shard: Rc<IggyShard>) 
-> Result<(), IggyE
         "HTTP API"
     };
 
-    let app_state = build_app_state(&config, shard).await;
+    let app_state = build_app_state(&config, persister, shard).await;
     let mut app = Router::new()
         .merge(system::router(app_state.clone(), &config.metrics))
         .merge(personal_access_tokens::router(app_state.clone()))
@@ -157,12 +158,10 @@ pub async fn start(config: HttpConfig, shard: 
Rc<IggyShard>) -> Result<(), IggyE
     }
 }
 
-async fn build_app_state(config: &HttpConfig, shard: Rc<IggyShard>) -> 
Arc<AppState> {
+async fn build_app_state(config: &HttpConfig, persister: Arc<PersisterKind>,  
shard: Rc<IggyShard>) -> Arc<AppState> {
     let tokens_path;
-    let persister;
     {
         tokens_path = shard.config.system.get_state_tokens_path();
-        persister = shard.storage.persister.clone();
     }
 
     let jwt_manager = JwtManager::from_config(persister, &tokens_path, 
&config.jwt);
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index bd1b73c7..095ba4c2 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -286,7 +286,7 @@ async fn main() -> Result<(), ServerError> {
         let streams = streams.clone();
         let shards_table = shards_table.clone();
         let users = users.clone();
-        let storage = storage.clone();
+        let persister = storage.persister.clone();
         let connections = connections.clone();
         let config = config.clone();
         let encryptor = encryptor.clone();
@@ -337,14 +337,13 @@ async fn main() -> Result<(), ServerError> {
                         .shards_table(shards_table)
                         .connections(connections)
                         .config(config)
-                        .storage(storage)
                         .encryptor(encryptor)
                         .version(current_version)
                         .metrics(metrics)
                         .build();
                     let shard = Rc::new(shard);
 
-                    if let Err(e) = shard.run().await {
+                    if let Err(e) = shard.run(persister).await {
                         error!("Failed to run shard-{id}: {e}");
                     }
                     shard_info!(shard.id, "Run completed");
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index fc203b77..287f31bb 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -53,7 +53,6 @@ pub struct IggyShardBuilder {
     config: Option<ServerConfig>,
     encryptor: Option<EncryptorKind>,
     version: Option<SemanticVersion>,
-    storage: Option<SystemStorage>,
     metrics: Option<Metrics>,
 }
 
@@ -106,11 +105,6 @@ impl IggyShardBuilder {
         self
     }
 
-    pub fn storage(mut self, storage: SystemStorage) -> Self {
-        self.storage = Some(storage);
-        self
-    }
-
     pub fn metrics(mut self, metrics: Metrics) -> Self {
         self.metrics = Some(metrics);
         self
@@ -125,7 +119,6 @@ impl IggyShardBuilder {
         let users = self.users.unwrap();
         let config = self.config.unwrap();
         let connections = self.connections.unwrap();
-        let storage = self.storage.unwrap();
         let encryptor = self.encryptor;
         let version = self.version.unwrap();
         let (stop_sender, stop_receiver, frame_receiver) = connections
@@ -141,7 +134,6 @@ impl IggyShardBuilder {
             .next()
             .expect("Failed to find connection with the specified ID");
         let shards = connections.into_iter().map(Shard::new).collect();
-        let storage = Rc::new(storage);
 
         // Initialize metrics
         let metrics = self.metrics.unwrap_or_else(|| Metrics::init());
@@ -151,7 +143,6 @@ impl IggyShardBuilder {
             shards_table,
             streams2: streams, // TODO: Fixme
             users: RefCell::new(users),
-            storage: storage,
             encryptor: encryptor,
             config: config,
             version: version,
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 5d3e6186..534ddfe3 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -75,22 +75,10 @@ use crate::{
         traits_ext::{EntityComponentSystem, EntityMarker, Insert},
     },
     state::{
-        StateKind,
-        file::FileState,
-        system::{StreamState, SystemState, UserState},
+        file::FileState, system::{StreamState, SystemState, UserState}, 
StateKind
     },
     streaming::{
-        clients::client_manager::ClientManager,
-        diagnostics::metrics::Metrics,
-        partitions,
-        personal_access_tokens::personal_access_token::PersonalAccessToken,
-        polling_consumer::PollingConsumer,
-        session::Session,
-        storage::SystemStorage,
-        streams, topics,
-        traits::MainOps,
-        users::{permissioner::Permissioner, user::User},
-        utils::ptr::EternalPtr,
+        clients::client_manager::ClientManager, diagnostics::metrics::Metrics, 
partitions, persistence::persister::PersisterKind, 
polling_consumer::PollingConsumer, session::Session, traits::MainOps, 
users::{permissioner::Permissioner, user::User}, utils::ptr::EternalPtr
     },
     tcp::tcp_server::spawn_tcp_server,
     versioning::SemanticVersion,
@@ -155,7 +143,6 @@ pub struct IggyShard {
     pub(crate) streams2: Streams,
     pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardInfo>>,
     // TODO: Refactor.
-    pub(crate) storage: Rc<SystemStorage>,
     pub(crate) state: StateKind,
 
     // Temporal...
@@ -187,7 +174,7 @@ impl IggyShard {
         Ok(())
     }
 
-    pub async fn run(self: &Rc<Self>) -> Result<(), IggyError> {
+    pub async fn run(self: &Rc<Self>, persister: Arc<PersisterKind>) -> 
Result<(), IggyError> {
         // Workaround to ensure that the statistics are initialized before the 
server
         // loads streams and starts accepting connections. This is necessary to
         // have the correct statistics when the server starts.
@@ -208,6 +195,7 @@ impl IggyShard {
             println!("Starting HTTP server on shard: {}", self.id);
             tasks.push(Box::pin(http_server::start(
                 self.config.http.clone(),
+                persister,
                 self.clone(),
             )));
         }
diff --git a/core/server/src/streaming/storage.rs 
b/core/server/src/streaming/storage.rs
index ff64f689..03199d70 100644
--- a/core/server/src/streaming/storage.rs
+++ b/core/server/src/streaming/storage.rs
@@ -47,6 +47,7 @@ macro_rules! forward_async_methods {
     }
 }
 
+// TODO: Tech debt, how to get rid of this ? 
 #[derive(Debug)]
 pub enum SystemInfoStorageKind {
     File(FileSystemInfoStorage),

Reply via email to