This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 7607c9d1 feat(io_uring): dump config to file after shard starts (#2024)
7607c9d1 is described below

commit 7607c9d14bd18664e83b67c13b578781c495ad37
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sat Jul 19 16:58:37 2025 +0200

    feat(io_uring): dump config to file after shard starts (#2024)
    
    Co-authored-by: Hubert Gruszecki <[email protected]>
---
 core/server/src/configs/sharding.rs         | 19 ++++++++--
 core/server/src/shard/builder.rs            |  7 ++--
 core/server/src/shard/mod.rs                |  8 +++++
 core/server/src/shard/transmission/event.rs |  3 ++
 core/server/src/tcp/mod.rs                  |  2 +-
 core/server/src/tcp/tcp_listener.rs         | 56 +++++++++++++++++++++++++++--
 core/server/src/tcp/tcp_server.rs           |  3 +-
 7 files changed, 86 insertions(+), 12 deletions(-)

diff --git a/core/server/src/configs/sharding.rs 
b/core/server/src/configs/sharding.rs
index 87679676..1accd214 100644
--- a/core/server/src/configs/sharding.rs
+++ b/core/server/src/configs/sharding.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use serde::{Deserialize, Deserializer, Serialize};
+use serde::{Deserialize, Deserializer, Serialize, Serializer};
 use std::collections::HashSet;
 use std::str::FromStr;
 use std::thread::available_parallelism;
@@ -35,7 +35,7 @@ impl Default for ShardingConfig {
     }
 }
 
-#[derive(Debug, Clone, PartialEq, Serialize)]
+#[derive(Debug, Clone, PartialEq)]
 pub enum CpuAllocation {
     All,
     Count(usize),
@@ -92,6 +92,21 @@ impl FromStr for CpuAllocation {
     }
 }
 
+impl Serialize for CpuAllocation {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: Serializer,
+    {
+        match self {
+            CpuAllocation::All => serializer.serialize_str("all"),
+            CpuAllocation::Count(n) => serializer.serialize_u64(*n as u64),
+            CpuAllocation::Range(start, end) => {
+                serializer.serialize_str(&format!("{start}..{end}"))
+            }
+        }
+    }
+}
+
 impl<'de> Deserialize<'de> for CpuAllocation {
     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     where
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index c49cdeb6..65c3592a 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -16,11 +16,7 @@
  * under the License.
  */
 
-use std::{
-    cell::Cell,
-    rc::Rc,
-    sync::{Arc, atomic::AtomicBool},
-};
+use std::{cell::Cell, rc::Rc, sync::atomic::AtomicBool};
 
 use iggy_common::{Aes256GcmEncryptor, EncryptorKind};
 use tracing::info;
@@ -130,6 +126,7 @@ impl IggyShardBuilder {
             metrics: Metrics::init(),
             task_registry: TaskRegistry::new(),
             is_shutting_down: AtomicBool::new(false),
+            tcp_bound_address: Cell::new(None),
 
             users: Default::default(),
             permissioner: Default::default(),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 43280c77..9de75aa5 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -37,6 +37,7 @@ use namespace::IggyNamespace;
 use std::{
     cell::{Cell, RefCell},
     future::Future,
+    net::SocketAddr,
     pin::Pin,
     rc::Rc,
     str::FromStr,
@@ -159,6 +160,7 @@ pub struct IggyShard {
     pub(crate) stop_sender: StopSender,
     pub(crate) task_registry: TaskRegistry,
     pub(crate) is_shutting_down: AtomicBool,
+    pub(crate) tcp_bound_address: Cell<Option<SocketAddr>>,
 }
 
 impl IggyShard {
@@ -208,6 +210,7 @@ impl IggyShard {
             stop_sender,
             task_registry: TaskRegistry::new(),
             is_shutting_down: AtomicBool::new(false),
+            tcp_bound_address: Cell::new(None),
         };
         let user = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD);
         shard
@@ -850,6 +853,11 @@ impl IggyShard {
                 self.update_permissions_bypass_auth(user_id, 
permissions.to_owned())?;
                 Ok(())
             }
+            ShardEvent::TcpBound { address } => {
+                info!("Received TcpBound event with address: {}", address);
+                self.tcp_bound_address.set(Some(*address));
+                Ok(())
+            }
         }
     }
 
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index d2462e2f..67997242 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -130,4 +130,7 @@ pub enum ShardEvent {
         address: SocketAddr,
         transport: Transport,
     },
+    TcpBound {
+        address: SocketAddr,
+    },
 }
diff --git a/core/server/src/tcp/mod.rs b/core/server/src/tcp/mod.rs
index 3e2ab77d..3eb84c1c 100644
--- a/core/server/src/tcp/mod.rs
+++ b/core/server/src/tcp/mod.rs
@@ -21,7 +21,7 @@ pub mod sender;
 pub mod tcp_listener;
 pub mod tcp_sender;
 pub mod tcp_server;
-mod tcp_socket;
+pub mod tcp_socket;
 pub mod tcp_tls_listener;
 pub mod tcp_tls_sender;
 
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index 6ad2cf08..db0bcfaf 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -28,6 +28,7 @@ use futures::FutureExt;
 use iggy_common::IggyError;
 use std::net::SocketAddr;
 use std::rc::Rc;
+use std::sync::Arc;
 use std::time::Duration;
 use tracing::{error, info};
 
@@ -64,17 +65,68 @@ async fn create_listener(
 
 pub async fn start(
     server_name: &'static str,
-    addr: SocketAddr,
+    mut addr: SocketAddr,
     config: &TcpSocketConfig,
     shard: Rc<IggyShard>,
 ) -> Result<(), IggyError> {
+    if shard.id != 0 && addr.port() == 0 {
+        info!("Shard {} waiting for TCP address from shard 0...", shard.id);
+        loop {
+            if let Some(bound_addr) = shard.tcp_bound_address.get() {
+                addr = bound_addr;
+                info!("Shard {} received TCP address: {}", shard.id, addr);
+                break;
+            }
+            compio::time::sleep(Duration::from_millis(10)).await;
+        }
+    }
+
     let listener = create_listener(addr, config)
         .await
         .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))
         .with_error_context(|err| {
             format!("Failed to bind {server_name} server to address: {addr}, 
{err}")
         })?;
-    info!("{server_name} server has started on: {:?}", addr);
+    let actual_addr = listener.local_addr().map_err(|e| {
+        error!("Failed to get local address: {e}");
+        IggyError::CannotBindToSocket(addr.to_string())
+    })?;
+    info!("{server_name} server has started on: {:?}", actual_addr);
+
+    if shard.id == 0 {
+        if addr.port() == 0 {
+            let event = ShardEvent::TcpBound {
+                address: actual_addr,
+            };
+            shard.broadcast_event_to_all_shards(Arc::new(event)).await;
+        }
+
+        let mut current_config = shard.config.clone();
+        current_config.tcp.address = actual_addr.to_string();
+
+        let runtime_path = current_config.system.get_runtime_path();
+        let current_config_path = 
format!("{runtime_path}/current_config.toml");
+        let current_config_content =
+            toml::to_string(&current_config).expect("Cannot serialize 
current_config");
+
+        let buf_result = compio::fs::write(&current_config_path, 
current_config_content).await;
+        match buf_result.0 {
+            Ok(_) => info!("Current config written to: {}", 
current_config_path),
+            Err(e) => error!(
+                "Failed to write current config to {}: {}",
+                current_config_path, e
+            ),
+        }
+    }
+
+    accept_loop(server_name, listener, shard).await
+}
+
+async fn accept_loop(
+    server_name: &'static str,
+    listener: TcpListener,
+    shard: Rc<IggyShard>,
+) -> Result<(), IggyError> {
     loop {
         let shutdown_check = async {
             loop {
diff --git a/core/server/src/tcp/tcp_server.rs 
b/core/server/src/tcp/tcp_server.rs
index 6b299bff..22a48144 100644
--- a/core/server/src/tcp/tcp_server.rs
+++ b/core/server/src/tcp/tcp_server.rs
@@ -24,7 +24,6 @@ use std::rc::Rc;
 use tracing::info;
 
 /// Starts the TCP server.
-/// Returns the address the server is listening on.
 pub async fn spawn_tcp_server(shard: Rc<IggyShard>) -> Result<(), IggyError> {
     let server_name = if shard.config.tcp.tls.enabled {
         "Iggy TCP TLS"
@@ -41,7 +40,7 @@ pub async fn spawn_tcp_server(shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
         .expect("Failed to parse TCP address");
     let socket = tcp_socket::build(ip_v6, socket_config);
     info!("Initializing {server_name} server...");
-    // TODO: Fixme -- storing addr of the server inside of the config for 
integration tests...
+
     match shard.config.tcp.tls.enabled {
         true => tcp_tls_listener::start(server_name, addr, socket, 
shard.clone()).await?,
         false => tcp_listener::start(server_name, addr, socket_config, 
shard.clone()).await?,

Reply via email to