This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 3c07044eac9e48f52c91293d36d48d29c60d249b
Author: numinex <[email protected]>
AuthorDate: Sat Jun 21 13:51:27 2025 +0200

    fix state initialization with gate
---
 core/server/src/bootstrap.rs                  |  48 +++------
 core/server/src/lib.rs                        |   8 +-
 core/server/src/main.rs                       | 138 +++++++++++++++++++++-----
 core/server/src/shard/builder.rs              |  62 +++++-------
 core/server/src/shard/mod.rs                  | 124 +++++++++++++++++++++--
 core/server/src/shard/transmission/frame.rs   |  17 ++++
 core/server/src/shard/transmission/message.rs |  17 ++++
 core/server/src/shard/transmission/mod.rs     |  18 ++++
 core/server/src/state/file.rs                 |   4 +-
 core/server/src/streaming/systems/info.rs     |  55 ----------
 core/server/src/streaming/systems/system.rs   |  11 +-
 core/server/src/streaming/utils/gate.rs       |  42 ++++++++
 core/server/src/streaming/utils/mod.rs        |   1 +
 13 files changed, 373 insertions(+), 172 deletions(-)

diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index c6108568..4f356a9b 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -12,10 +12,13 @@ use crate::{
     IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
     configs::{config_provider::ConfigProviderKind, server::ServerConfig, 
system::SystemConfig},
     server_error::ServerError,
-    shard::{transmission::connector::ShardConnector, 
transmission::frame::ShardFrame},
-    streaming::users::user::User,
+    shard::transmission::{connector::ShardConnector, frame::ShardFrame},
+    streaming::{
+        persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind},
+        users::user::User,
+    },
 };
-use std::{env, fs::remove_dir_all, ops::Range, path::Path};
+use std::{env, fs::remove_dir_all, ops::Range, path::Path, sync::Arc};
 
 pub fn create_shard_connections(shards_set: Range<usize>) -> 
Vec<ShardConnector<ShardFrame>> {
     let shards_count = shards_set.len();
@@ -68,37 +71,7 @@ pub async fn create_directories(config: &SystemConfig) -> 
Result<(), IggyError>
 
     // TODO: Move this to individual shard level
     /*
-    let state_entries = self.state.init().await.with_error_context(|error| {
-        format!("{COMPONENT} (error: {error}) - failed to initialize state 
entries")
-    })?;
-    let system_state = SystemState::init(state_entries)
-        .await
-        .with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to initialize 
system state")
-        })?;
-    let now = Instant::now();
-    self.load_version().await.with_error_context(|error| {
-        format!("{COMPONENT} (error: {error}) - failed to load version")
-    })?;
-    self.load_users(system_state.users.into_values().collect())
-        .await
-        .with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to load users")
-        })?;
-    self.load_streams(system_state.streams.into_values().collect())
-        .await
-        .with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to load streams")
-        })?;
-    if let Some(archiver) = self.archiver.as_ref() {
-        archiver
-            .init()
-            .await
-            .expect("Failed to initialize archiver");
-    }
-    info!("Initialized system in {} ms.", now.elapsed().as_millis());
-    Ok(())
-    */
+     */
 }
 
 pub fn create_root_user() -> User {
@@ -158,3 +131,10 @@ pub fn create_shard_executor() -> 
Runtime<TimeDriver<monoio::IoUringDriver>> {
     let rt = Buildable::build(builder).expect("Failed to create default 
runtime");
     rt
 }
+
+pub fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> {
+    match enforce_fsync {
+        true => Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)),
+        false => Arc::new(PersisterKind::File(FilePersister)),
+    }
+}
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index e64837da..3dcf64b2 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -43,11 +43,11 @@ pub mod streaming;
 pub mod tcp;
 pub mod versioning;
 
-const VERSION: &str = env!("CARGO_PKG_VERSION");
-const IGGY_ROOT_USERNAME_ENV: &str = "IGGY_ROOT_USERNAME";
-const IGGY_ROOT_PASSWORD_ENV: &str = "IGGY_ROOT_PASSWORD";
+pub const VERSION: &str = env!("CARGO_PKG_VERSION");
+pub const IGGY_ROOT_USERNAME_ENV: &str = "IGGY_ROOT_USERNAME";
+pub const IGGY_ROOT_PASSWORD_ENV: &str = "IGGY_ROOT_PASSWORD";
 
-pub(crate) fn map_toggle_str<'a>(enabled: bool) -> &'a str {
+pub fn map_toggle_str<'a>(enabled: bool) -> &'a str {
     match enabled {
         true => "enabled",
         false => "disabled",
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 67728862..47a23f47 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use std::sync::Arc;
 use std::thread::available_parallelism;
 
 use anyhow::Result;
@@ -23,33 +24,32 @@ use clap::Parser;
 use dotenvy::dotenv;
 use error_set::ErrContext;
 use figlet_rs::FIGfont;
+use iggy_common::create_user::CreateUser;
+use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError};
 use server::args::Args;
 use server::bootstrap::{
     create_default_executor, create_directories, create_root_user, 
create_shard_connections,
-    create_shard_executor, load_config,
+    create_shard_executor, load_config, resolve_persister,
 };
-use server::channels::commands::archive_state::ArchiveStateExecutor;
-use 
server::channels::commands::clean_personal_access_tokens::CleanPersonalAccessTokensExecutor;
-use server::channels::commands::maintain_messages::MaintainMessagesExecutor;
-use server::channels::commands::print_sysinfo::SysInfoPrintExecutor;
-use server::channels::commands::save_messages::SaveMessagesExecutor;
-use server::channels::commands::verify_heartbeats::VerifyHeartbeatsExecutor;
-use server::channels::handler::BackgroundServerCommandHandler;
-use server::configs::config_provider::{self, ConfigProviderKind};
-use server::configs::server::ServerConfig;
-use server::http::http_server;
+use server::configs::config_provider::{self};
 #[cfg(not(feature = "tokio-console"))]
 use server::log::logger::Logging;
 #[cfg(feature = "tokio-console")]
 use server::log::tokio_console::Logging;
-use server::quic::quic_server;
 use server::server_error::ServerError;
 use server::shard::IggyShard;
-use server::streaming::systems::system::{SharedSystem, System};
-use server::streaming::utils::MemoryPool;
-use server::tcp::tcp_server;
+use server::state::StateKind;
+use server::state::command::EntryCommand;
+use server::state::file::FileState;
+use server::state::models::CreateUserWithId;
+use server::state::system::SystemState;
+use server::streaming::utils::gate::Gate;
+use server::versioning::SemanticVersion;
+use server::{IGGY_ROOT_USERNAME_ENV, map_toggle_str};
 use tokio::time::Instant;
-use tracing::{info, instrument};
+use tracing::{error, info, instrument};
+
+const COMPONENT: &str = "MAIN";
 
 #[instrument(skip_all, name = "trace_start_server")]
 fn main() -> Result<(), ServerError> {
@@ -88,7 +88,11 @@ fn main() -> Result<(), ServerError> {
         config
     })?;
 
-    // Create directories and root user.
+    // Initialize logging
+    let mut logging = Logging::new(config.telemetry.clone());
+    logging.early_init();
+
+    // Create directories.
     // Remove `local_data` directory if run with `--fresh` flag.
     std::thread::scope(|scope| {
         scope
@@ -117,13 +121,9 @@ fn main() -> Result<(), ServerError> {
                 })
             })
             .join()
-            .expect("Failed to create directories and root user")
+            .expect("Failed join thread")
     })
-    .with_error_context(|err| format!("Failed to create directories, err: 
{err}"))?;
-
-    // Initialize logging
-    let mut logging = Logging::new(config.telemetry.clone());
-    logging.early_init();
+    .with_error_context(|err| format!("Failed to init server: {err}"))?;
 
     // TODO: Make this configurable from config as a range
     // for example this instance of Iggy will use cores from 0..4
@@ -132,9 +132,11 @@ fn main() -> Result<(), ServerError> {
     let shards_set = 0..shards_count;
     let connections = create_shard_connections(shards_set.clone());
     for shard_id in shards_set {
+        let gate: Arc<Gate<()>> = Arc::new(Gate::new());
         let id = shard_id as u16;
         let connections = connections.clone();
-        let server_config = config.clone();
+        let config = config.clone();
+        let state_persister = 
resolve_persister(config.system.state.enforce_fsync);
         std::thread::Builder::new()
             .name(format!("shard-{id}"))
             .spawn(move || {
@@ -143,11 +145,97 @@ fn main() -> Result<(), ServerError> {
 
                 let mut rt = create_shard_executor();
                 rt.block_on(async move {
+                    let version = SemanticVersion::current().expect("Invalid 
version");
+                    info!(
+                        "Server-side encryption is {}.",
+                        map_toggle_str(config.system.encryption.enabled)
+                    );
+                    let encryptor: Option<EncryptorKind> = match 
config.system.encryption.enabled {
+                        true => Some(EncryptorKind::Aes256Gcm(
+                            
Aes256GcmEncryptor::from_base64_key(&config.system.encryption.key)
+                                .unwrap(),
+                        )),
+                        false => None,
+                    };
+
+                    let state = StateKind::File(FileState::new(
+                        &config.system.get_state_messages_file_path(),
+                        &version,
+                        state_persister,
+                        encryptor.clone(),
+                    ));
+
+                    let gate = gate.clone();
+                    gate.with_async::<Result<(), IggyError>>(async 
|gate_state| {
+                        if let Some(_) = gate_state.inner() {
+                            return Ok(());
+                        }
+
+                        let state_entries = 
state.load_entries().await.with_error_context(|error| {
+                            format!(
+                                "{COMPONENT} (error: {error}) - failed to load 
state entries"
+                            )
+                        })?;
+                        let root_exists = state_entries
+                            .iter()
+                            .find(|entry| {
+                                entry
+                                    .command()
+                                    .and_then(|command| match command {
+                                        EntryCommand::CreateUser(payload)
+                                            if payload.command.username
+                                                == IGGY_ROOT_USERNAME_ENV =>
+                                        {
+                                            Ok(true)
+                                        }
+                                        _ => Ok(false),
+                                    })
+                                    .map_or_else(
+                                        |err| {
+                                            error!("Failed to check if root 
user exists: {err}");
+                                            false
+                                        },
+                                        |v| v,
+                                    )
+                            })
+                            .is_some();
+
+                        if !root_exists {
+                            info!("No users found, creating the root user...");
+                            let root = create_root_user();
+                            let command = CreateUser {
+                                username: root.username.clone(),
+                                password: root.password.clone(),
+                                status: root.status,
+                                permissions: root.permissions.clone(),
+                            };
+                            state
+                                .apply(0, 
&EntryCommand::CreateUser(CreateUserWithId {
+                                    user_id: root.id,
+                                    command
+                                }))
+                                .await
+                                .with_error_context(|error| {
+                                    format!(
+                                        "{COMPONENT} (error: {error}) - failed 
to apply create user command, username: {}",
+                                        root.username
+                                    )
+                                })?;
+                        }
+
+                        gate_state.set_result(());
+                        Ok(())
+                    })
+                    .await;
+
                     let builder = IggyShard::builder();
                     let mut shard = builder
                         .id(id)
                         .connections(connections)
-                        .server_config(server_config)
+                        .config(config)
+                        .encryptor(encryptor)
+                        .version(version)
+                        .state(state)
                         .build()
                         .await;
 
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 36fe0e54..e796bd11 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -22,14 +22,12 @@ use iggy_common::{Aes256GcmEncryptor, EncryptorKind};
 use tracing::info;
 
 use crate::{
+    bootstrap::resolve_persister,
     configs::server::ServerConfig,
     map_toggle_str,
     shard::Shard,
     state::{StateKind, file::FileState},
-    streaming::{
-        persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind},
-        storage::SystemStorage,
-    },
+    streaming::storage::SystemStorage,
     versioning::SemanticVersion,
 };
 
@@ -40,6 +38,9 @@ pub struct IggyShardBuilder {
     id: Option<u16>,
     connections: Option<Vec<ShardConnector<ShardFrame>>>,
     config: Option<ServerConfig>,
+    encryptor: Option<EncryptorKind>,
+    version: Option<SemanticVersion>,
+    state: Option<StateKind>,
 }
 
 impl IggyShardBuilder {
@@ -53,16 +54,32 @@ impl IggyShardBuilder {
         self
     }
 
-    pub fn server_config(mut self, config: ServerConfig) -> Self {
+    pub fn config(mut self, config: ServerConfig) -> Self {
         self.config = Some(config);
         self
     }
 
+    pub fn encryptor(mut self, encryptor: Option<EncryptorKind>) -> Self {
+        self.encryptor = encryptor;
+        self
+    }
+
+    pub fn version(mut self, version: SemanticVersion) -> Self {
+        self.version = Some(version);
+        self
+    }
+
+    pub fn state(mut self, state: StateKind) -> Self {
+        self.state = Some(state);
+        self
+    }
+
     // TODO: Too much happens in there, some of those bootstrapping logic 
should be moved outside.
     pub async fn build(self) -> IggyShard {
         let id = self.id.unwrap();
         let config = self.config.unwrap();
         let connections = self.connections.unwrap();
+        let state = self.state.unwrap();
         let (stop_sender, stop_receiver, frame_receiver) = connections
             .iter()
             .filter(|c| c.id == id)
@@ -76,33 +93,7 @@ impl IggyShardBuilder {
             .next()
             .expect("Failed to find connection with the specified ID");
         let shards = connections.into_iter().map(Shard::new).collect();
-
-        //TODO: This can be discrete step in the builder bootstrapped from 
main function.
-        let version = SemanticVersion::current().expect("Invalid version");
-
-        //TODO: This can be discrete step in the builder bootstrapped from 
main function.
-        info!(
-            "Server-side encryption is {}.",
-            map_toggle_str(config.system.encryption.enabled)
-        );
-        let encryptor: Option<Arc<EncryptorKind>> = match 
config.system.encryption.enabled {
-            true => Some(Arc::new(EncryptorKind::Aes256Gcm(
-                
Aes256GcmEncryptor::from_base64_key(&config.system.encryption.key).unwrap(),
-            ))),
-            false => None,
-        };
-
-        //TODO: This can be discrete step in the builder bootstrapped from 
main function.
-        let state_persister = 
Self::resolve_persister(config.system.state.enforce_fsync);
-        let state = Rc::new(StateKind::File(FileState::new(
-            &config.system.get_state_messages_file_path(),
-            &version,
-            state_persister,
-            encryptor.clone(),
-        )));
-
-        //TODO: This can be discrete step in the builder bootstrapped from 
main function.
-        let partition_persister = 
Self::resolve_persister(config.system.partition.enforce_fsync);
+        let partition_persister = 
resolve_persister(config.system.partition.enforce_fsync);
         let storage = Rc::new(SystemStorage::new(
             config.system.clone(),
             partition_persister,
@@ -120,11 +111,4 @@ impl IggyShardBuilder {
             frame_receiver: Cell::new(Some(frame_receiver)),
         }
     }
-
-    fn resolve_persister(enforce_fsync: bool) -> Arc<PersisterKind> {
-        match enforce_fsync {
-            true => 
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister)),
-            false => Arc::new(PersisterKind::File(FilePersister)),
-        }
-    }
 }
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 6a6d35b8..2f0929bf 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -22,11 +22,13 @@ pub mod transmission;
 
 use ahash::HashMap;
 use builder::IggyShardBuilder;
+use error_set::ErrContext;
 use iggy_common::IggyError;
 use namespace::IggyNamespace;
 use std::{
     cell::{Cell, RefCell},
     rc::Rc,
+    str::FromStr,
     sync::Arc,
     time::Instant,
 };
@@ -37,9 +39,17 @@ use crate::{
     bootstrap::create_root_user,
     configs::server::ServerConfig,
     shard::transmission::frame::ShardFrame,
-    state::{StateKind, file::FileState},
-    streaming::storage::SystemStorage,
+    state::{
+        StateKind,
+        file::FileState,
+        system::{SystemState, UserState},
+    },
+    streaming::{storage::SystemStorage, systems::info::SystemInfo},
+    versioning::SemanticVersion,
 };
+
+pub const COMPONENT: &str = "SHARD";
+
 pub(crate) struct Shard {
     id: u16,
     connection: ShardConnector<ShardFrame>,
@@ -70,7 +80,7 @@ pub struct IggyShard {
     // TODO: Refactor.
     pub(crate) storage: Rc<SystemStorage>,
 
-    pub(crate) state: Rc<StateKind>,
+    pub(crate) state: StateKind,
     //pub(crate) encryptor: Option<Rc<dyn Encryptor>>,
     config: ServerConfig,
     //pub(crate) client_manager: RefCell<ClientManager>,
@@ -92,10 +102,12 @@ impl IggyShard {
         //let state_entries = self.state.init().await?;
         //let system_state = SystemState::init(state_entries).await?;
         //let user = create_root_user();
-        self.load_state().await;
-        self.load_users().await;
+        self.load_version().await?;
+        let state = self.load_state().await?;
+        //self.load_users().await;
         // Add default root user.
         self.load_streams().await;
+
         //TODO: Fix the archiver.
         /*
         if let Some(archiver) = self.archiver.as_ref() {
@@ -109,12 +121,106 @@ impl IggyShard {
         Ok(())
     }
 
-    async fn load_state(&self) {
-        todo!()
+    async fn load_version(&self) -> Result<(), IggyError> {
+        async fn update_system_info(
+            storage: &Rc<SystemStorage>,
+            system_info: &mut SystemInfo,
+            version: &SemanticVersion,
+        ) -> Result<(), IggyError> {
+            system_info.update_version(version);
+            storage.info.save(system_info).await?;
+            Ok(())
+        }
+
+        let current_version = SemanticVersion::current()?;
+        let mut system_info;
+        let load_system_info = self.storage.info.load().await;
+        if load_system_info.is_err() {
+            let error = load_system_info.err().unwrap();
+            if let IggyError::ResourceNotFound(_) = error {
+                info!("System info not found, creating...");
+                system_info = SystemInfo::default();
+                update_system_info(&self.storage, &mut system_info, 
&current_version).await?;
+            } else {
+                return Err(error);
+            }
+        } else {
+            system_info = load_system_info.unwrap();
+        }
+
+        info!("Loaded {system_info}.");
+        let loaded_version = 
SemanticVersion::from_str(&system_info.version.version)?;
+        if current_version.is_equal_to(&loaded_version) {
+            info!("System version {current_version} is up to date.");
+        } else if current_version.is_greater_than(&loaded_version) {
+            info!(
+                "System version {current_version} is greater than 
{loaded_version}, checking the available migrations..."
+            );
+            update_system_info(&self.storage, &mut system_info, 
&current_version).await?;
+        } else {
+            info!(
+                "System version {current_version} is lower than 
{loaded_version}, possible downgrade."
+            );
+            update_system_info(&self.storage, &mut system_info, 
&current_version).await?;
+        }
+
+        Ok(())
     }
 
-    async fn load_users(&self) {
-        todo!()
+    async fn load_state(&self) -> Result<SystemState, IggyError> {
+        let root = create_root_user();
+        let state_entries = self.state.init().await.with_error_context(|error| 
{
+            format!("{COMPONENT} (error: {error}) - failed to initialize state 
entries")
+        })?;
+        let system_state = SystemState::init(state_entries)
+            .await
+            .with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to initialize 
system state")
+            })?;
+        Ok(system_state)
+    }
+
+    async fn load_users(&mut self, users: Vec<UserState>) -> Result<(), 
IggyError> {
+        info!("Loading users...");
+        /*
+
+        for user_state in users.into_iter() {
+            let mut user = User::with_password(
+                user_state.id,
+                &user_state.username,
+                user_state.password_hash,
+                user_state.status,
+                user_state.permissions,
+            );
+
+            user.created_at = user_state.created_at;
+            user.personal_access_tokens = user_state
+                .personal_access_tokens
+                .into_values()
+                .map(|token| {
+                    (
+                        Arc::new(token.token_hash.clone()),
+                        PersonalAccessToken::raw(
+                            user_state.id,
+                            &token.name,
+                            &token.token_hash,
+                            token.expiry_at,
+                        ),
+                    )
+                })
+                .collect();
+            self.users.insert(user_state.id, user);
+        }
+
+        let users_count = self.users.len();
+        let current_user_id = self.users.keys().max().unwrap_or(&1);
+        USER_ID.store(current_user_id + 1, Ordering::SeqCst);
+        self.permissioner
+            .init(&self.users.values().collect::<Vec<&User>>());
+        self.metrics.increment_users(users_count as u32);
+        info!("Initialized {users_count} user(s).");
+        */
+        Ok(())
     }
 
     async fn load_streams(&self) {
diff --git a/core/server/src/shard/transmission/frame.rs 
b/core/server/src/shard/transmission/frame.rs
index ac60863d..ec63daf0 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -1,3 +1,20 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 use async_channel::Sender;
 use bytes::Bytes;
 use iggy_common::IggyError;
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index e9f1dd4b..3572c0ec 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -1,3 +1,20 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 use crate::binary::command::ServerCommand;
 
 #[derive(Debug)]
diff --git a/core/server/src/shard/transmission/mod.rs 
b/core/server/src/shard/transmission/mod.rs
index b871152a..1f9a79ed 100644
--- a/core/server/src/shard/transmission/mod.rs
+++ b/core/server/src/shard/transmission/mod.rs
@@ -1,3 +1,21 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 pub mod connector;
 pub mod frame;
 pub mod message;
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index f162ed75..aa14b501 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -47,7 +47,7 @@ pub struct FileState {
     version: u32,
     path: String,
     persister: Arc<PersisterKind>,
-    encryptor: Option<Arc<EncryptorKind>>,
+    encryptor: Option<EncryptorKind>,
 }
 
 impl FileState {
@@ -55,7 +55,7 @@ impl FileState {
         path: &str,
         version: &SemanticVersion,
         persister: Arc<PersisterKind>,
-        encryptor: Option<Arc<EncryptorKind>>,
+        encryptor: Option<EncryptorKind>,
     ) -> Self {
         Self {
             current_index: AtomicU64::new(0),
diff --git a/core/server/src/streaming/systems/info.rs 
b/core/server/src/streaming/systems/info.rs
index 61c1bcbb..3c20fd63 100644
--- a/core/server/src/streaming/systems/info.rs
+++ b/core/server/src/streaming/systems/info.rs
@@ -16,15 +16,11 @@
  * under the License.
  */
 
-use crate::streaming::systems::system::System;
 use crate::versioning::SemanticVersion;
-use iggy_common::IggyError;
 use serde::{Deserialize, Serialize};
 use std::collections::hash_map::DefaultHasher;
 use std::fmt::Display;
 use std::hash::{Hash, Hasher};
-use std::str::FromStr;
-use tracing::info;
 
 #[derive(Debug, Serialize, Deserialize, Default)]
 pub struct SystemInfo {
@@ -46,57 +42,6 @@ pub struct Migration {
     pub applied_at: u64,
 }
 
-impl System {
-    pub(crate) async fn load_version(&mut self) -> Result<(), IggyError> {
-        let current_version = SemanticVersion::current()?;
-        let mut system_info;
-        let load_system_info = self.storage.info.load().await;
-        if load_system_info.is_err() {
-            let error = load_system_info.err().unwrap();
-            if let IggyError::ResourceNotFound(_) = error {
-                info!("System info not found, creating...");
-                system_info = SystemInfo::default();
-                self.update_system_info(&mut system_info, &current_version)
-                    .await?;
-            } else {
-                return Err(error);
-            }
-        } else {
-            system_info = load_system_info.unwrap();
-        }
-
-        info!("Loaded {system_info}.");
-        let loaded_version = 
SemanticVersion::from_str(&system_info.version.version)?;
-        if current_version.is_equal_to(&loaded_version) {
-            info!("System version {current_version} is up to date.");
-        } else if current_version.is_greater_than(&loaded_version) {
-            info!(
-                "System version {current_version} is greater than 
{loaded_version}, checking the available migrations..."
-            );
-            self.update_system_info(&mut system_info, &current_version)
-                .await?;
-        } else {
-            info!(
-                "System version {current_version} is lower than 
{loaded_version}, possible downgrade."
-            );
-            self.update_system_info(&mut system_info, &current_version)
-                .await?;
-        }
-
-        Ok(())
-    }
-
-    async fn update_system_info(
-        &self,
-        system_info: &mut SystemInfo,
-        version: &SemanticVersion,
-    ) -> Result<(), IggyError> {
-        system_info.update_version(version);
-        self.storage.info.save(system_info).await?;
-        Ok(())
-    }
-}
-
 impl SystemInfo {
     pub fn update_version(&mut self, version: &SemanticVersion) {
         self.version.version = version.to_string();
diff --git a/core/server/src/streaming/systems/system.rs 
b/core/server/src/streaming/systems/system.rs
index 761f6d1b..b3eb3804 100644
--- a/core/server/src/streaming/systems/system.rs
+++ b/core/server/src/streaming/systems/system.rs
@@ -102,10 +102,10 @@ impl System {
             map_toggle_str(config.encryption.enabled)
         );
 
-        let encryptor: Option<Arc<EncryptorKind>> = match 
config.encryption.enabled {
-            true => Some(Arc::new(EncryptorKind::Aes256Gcm(
+        let encryptor: Option<EncryptorKind> = match config.encryption.enabled 
{
+            true => Some(EncryptorKind::Aes256Gcm(
                 
Aes256GcmEncryptor::from_base64_key(&config.encryption.key).unwrap(),
-            ))),
+            )),
             false => None,
         };
 
@@ -122,7 +122,7 @@ impl System {
             config.clone(),
             SystemStorage::new(config, partition_persister),
             state,
-            encryptor,
+            encryptor.map(Arc::new),
             data_maintenance_config,
             pat_config,
         )
@@ -224,9 +224,12 @@ impl System {
                 format!("{COMPONENT} (error: {error}) - failed to initialize 
system state")
             })?;
         let now = Instant::now();
+        //DONE
+        /*
         self.load_version().await.with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to load version")
         })?;
+        */
         self.load_users(system_state.users.into_values().collect())
             .await
             .with_error_context(|error| {
diff --git a/core/server/src/streaming/utils/gate.rs 
b/core/server/src/streaming/utils/gate.rs
new file mode 100644
index 00000000..315250fa
--- /dev/null
+++ b/core/server/src/streaming/utils/gate.rs
@@ -0,0 +1,42 @@
+use std::sync::{Condvar, Mutex};
+
+#[derive(Default)]
+pub struct Gate<T> {
+    state: Mutex<GateState<T>>,
+}
+
+#[derive(Default)]
+pub struct GateState<T> {
+    result: Option<T>,
+}
+
+impl<T> GateState<T> {
+    pub fn set_result(&mut self, result: T) {
+        self.result = Some(result);
+    }
+
+    pub fn inner(&self) -> &Option<T> {
+        &self.result
+    }
+}
+
+impl<T> Gate<T>
+where
+    T: Default,
+{
+    pub fn new() -> Self {
+        Gate {
+            state: Default::default(),
+        }
+    }
+
+    pub async fn with_async<R>(&self, f: impl AsyncFnOnce(&mut GateState<T>) 
-> R) {
+        let mut guard = self.state.lock().unwrap();
+        f(&mut guard).await;
+    }
+
+    pub async fn with_once<R>(&self, f: impl FnOnce(&mut GateState<T>) -> R) {
+        let mut guard = self.state.lock().unwrap();
+        f(&mut guard);
+    }
+}
diff --git a/core/server/src/streaming/utils/mod.rs 
b/core/server/src/streaming/utils/mod.rs
index e441a051..1f1c731f 100644
--- a/core/server/src/streaming/utils/mod.rs
+++ b/core/server/src/streaming/utils/mod.rs
@@ -18,6 +18,7 @@
 
 pub mod crypto;
 pub mod file;
+pub mod gate;
 pub mod hash;
 pub mod head_tail_buf;
 pub mod random_id;

Reply via email to