This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch cluster-metadata-port
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 8fd9c403b05be45594b074f47781a41aa1ab83b1
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Nov 21 18:27:00 2025 +0100

    feat(cluster): add full transport endpoints to cluster configuration
    
    BREAKING CHANGE: ClusterNode now uses TransportEndpoints instead of single 
address
    
    - Add TransportEndpoints struct with TCP, QUIC, HTTP, WebSocket addresses
    - Restructure cluster config: current node (name only) + other nodes (full 
details)
    - Current node derives ports from main transport configs to avoid 
duplication
    - Update SDK leader detection to use appropriate transport endpoint
    - Fix integration tests for new configuration structure
    
    This enables cluster nodes to have complete information about all transport
    endpoints of other nodes, supporting multi-protocol cluster communication.
---
 bdd/docker-compose.yml                             |  44 +--
 bdd/rust/tests/helpers/cluster.rs                  |  10 +-
 bdd/rust/tests/steps/leader_redirection.rs         |  24 +-
 core/common/src/types/cluster/metadata.rs          |  38 +-
 core/common/src/types/cluster/mod.rs               |   2 +
 core/common/src/types/cluster/node.rs              |  54 ++-
 .../src/types/cluster/transport_endpoints.rs       | 137 +++++++
 core/configs/server.toml                           |  31 +-
 core/integration/tests/config_provider/mod.rs      | 414 +++++++++------------
 core/integration/tests/mcp/mod.rs                  |   7 +-
 core/sdk/src/leader_aware.rs                       |  23 +-
 core/sdk/src/prelude.rs                            |   6 +-
 core/sdk/src/quic/quic_client.rs                   |   7 +-
 core/sdk/src/tcp/tcp_client.rs                     |   7 +-
 core/sdk/src/websocket/websocket_client.rs         |   7 +-
 core/server/src/configs/cluster.rs                 |  26 +-
 core/server/src/configs/config_provider.rs         |  27 +-
 core/server/src/configs/defaults.rs                |  22 +-
 core/server/src/configs/validators.rs              |  99 +++--
 core/server/src/shard/system/cluster.rs            | 131 +++++--
 core/server/src/streaming/systems/cluster/mod.rs   |  60 +--
 core/server/src/streaming/utils/address.rs         |  76 ++++
 core/server/src/streaming/utils/mod.rs             |   1 +
 23 files changed, 747 insertions(+), 506 deletions(-)

diff --git a/bdd/docker-compose.yml b/bdd/docker-compose.yml
index b9469dd31..fd52d5867 100644
--- a/bdd/docker-compose.yml
+++ b/bdd/docker-compose.yml
@@ -95,18 +95,16 @@ services:
       - IGGY_CLUSTER_ENABLED=true
       - IGGY_CLUSTER_NAME=test-cluster
       - IGGY_CLUSTER_ID=1
-      - IGGY_CLUSTER_TRANSPORT=tcp
-      # This node's identity
-      - IGGY_CLUSTER_NODE_ID=0
-      - IGGY_CLUSTER_NODE_NAME=leader-node
-      - IGGY_CLUSTER_NODE_ADDRESS=iggy-leader:8091
-      # Cluster nodes configuration (indexed format for array)
-      - IGGY_CLUSTER_NODES_0_ID=0
-      - IGGY_CLUSTER_NODES_0_NAME=leader-node
-      - IGGY_CLUSTER_NODES_0_ADDRESS=iggy-leader:8091
-      - IGGY_CLUSTER_NODES_1_ID=1
-      - IGGY_CLUSTER_NODES_1_NAME=follower-node
-      - IGGY_CLUSTER_NODES_1_ADDRESS=iggy-follower:8092
+      # Current node identity
+      - IGGY_CLUSTER_NODE_CURRENT_NAME=leader-node
+      - IGGY_CLUSTER_NODE_CURRENT_IP=iggy-leader
+      # Other nodes configuration
+      - IGGY_CLUSTER_NODE_OTHERS_0_NAME=follower-node
+      - IGGY_CLUSTER_NODE_OTHERS_0_IP=iggy-follower
+      - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_TCP=8092
+      - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_QUIC=8082
+      - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_HTTP=3002
+      - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_WEBSOCKET=8072
     volumes:
       - iggy_leader_data:/app/local_data_leader
     networks:
@@ -152,18 +150,16 @@ services:
       - IGGY_CLUSTER_ENABLED=true
       - IGGY_CLUSTER_NAME=test-cluster
       - IGGY_CLUSTER_ID=1
-      - IGGY_CLUSTER_TRANSPORT=tcp
-      # This node's identity (different from leader)
-      - IGGY_CLUSTER_NODE_ID=1
-      - IGGY_CLUSTER_NODE_NAME=follower-node
-      - IGGY_CLUSTER_NODE_ADDRESS=iggy-follower:8092
-      # Cluster nodes configuration (indexed format for array)
-      - IGGY_CLUSTER_NODES_0_ID=0
-      - IGGY_CLUSTER_NODES_0_NAME=leader-node
-      - IGGY_CLUSTER_NODES_0_ADDRESS=iggy-leader:8091
-      - IGGY_CLUSTER_NODES_1_ID=1
-      - IGGY_CLUSTER_NODES_1_NAME=follower-node
-      - IGGY_CLUSTER_NODES_1_ADDRESS=iggy-follower:8092
+      # Current node identity (different from leader)
+      - IGGY_CLUSTER_NODE_CURRENT_NAME=follower-node
+      - IGGY_CLUSTER_NODE_CURRENT_IP=iggy-follower
+      # Other nodes configuration
+      - IGGY_CLUSTER_NODE_OTHERS_0_NAME=leader-node
+      - IGGY_CLUSTER_NODE_OTHERS_0_IP=iggy-leader
+      - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_TCP=8091
+      - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_QUIC=8081
+      - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_HTTP=3001
+      - IGGY_CLUSTER_NODE_OTHERS_0_PORTS_WEBSOCKET=8071
     volumes:
       - iggy_follower_data:/app/local_data_follower
     networks:
diff --git a/bdd/rust/tests/helpers/cluster.rs 
b/bdd/rust/tests/helpers/cluster.rs
index 71ef991af..b58c05be9 100644
--- a/bdd/rust/tests/helpers/cluster.rs
+++ b/bdd/rust/tests/helpers/cluster.rs
@@ -112,7 +112,7 @@ pub fn update_node_role(
 ) -> bool {
     if let Some(node) = nodes
         .iter_mut()
-        .find(|n| n.id == node_id && n.address.ends_with(&format!(":{}", 
port)))
+        .find(|n| n.name == format!("node-{}", node_id) && n.endpoints.tcp == 
port)
     {
         node.role = role;
         node.status = ClusterNodeStatus::Healthy;
@@ -122,14 +122,6 @@ pub fn update_node_role(
     }
 }
 
-/// Extracts port number from an address string
-pub fn extract_port_from_address(address: &str) -> Option<u16> {
-    address
-        .rsplit(':')
-        .next()
-        .and_then(|port_str| port_str.parse().ok())
-}
-
 /// Determines server type from port number
 pub fn server_type_from_port(port: u16) -> &'static str {
     match port {
diff --git a/bdd/rust/tests/steps/leader_redirection.rs 
b/bdd/rust/tests/steps/leader_redirection.rs
index 7d484ff3d..20c36af7e 100644
--- a/bdd/rust/tests/steps/leader_redirection.rs
+++ b/bdd/rust/tests/steps/leader_redirection.rs
@@ -36,10 +36,21 @@ async fn given_cluster_config(world: &mut LeaderContext, 
node_count: usize) {
 
 #[given(regex = r"^node (\d+) is configured on port (\d+)$")]
 async fn given_node_configured(world: &mut LeaderContext, node_id: u32, port: 
u16) {
+    let (quic_port, http_port, ws_port) = match port {
+        8091 => (8081, 3001, 8071),               // Leader ports
+        8092 => (8082, 3002, 8072),               // Follower ports
+        _ => (port - 10, port - 5090, port - 20), // Default mapping
+    };
+
     let node = ClusterNode {
-        id: node_id,
         name: format!("node-{}", node_id),
-        address: format!("iggy-server:{}", port),
+        ip: "iggy-server".to_string(),
+        endpoints: TransportEndpoints::new(
+            port,      // TCP port
+            quic_port, // QUIC port
+            http_port, // HTTP port
+            ws_port,   // WebSocket port
+        ),
         role: ClusterNodeRole::Follower,
         status: ClusterNodeStatus::Healthy,
     };
@@ -196,7 +207,7 @@ async fn then_verify_client_port(world: &mut LeaderContext, 
expected_port: u16)
     // Check cluster metadata if available
     if let Ok(Some(leader)) = cluster::verify_leader_in_metadata(client).await 
{
         // If we found a leader and we're connected to the leader port, mark 
redirection
-        if cluster::extract_port_from_address(&leader.address) == 
Some(expected_port) {
+        if leader.endpoints.tcp == expected_port {
             world.test_state.redirection_occurred = true;
         }
     }
@@ -223,8 +234,8 @@ async fn then_verify_named_client_port(
 
     if let Ok(Some(leader)) = cluster::verify_leader_in_metadata(client).await 
{
         assert!(
-            cluster::extract_port_from_address(&leader.address).is_some(),
-            "Client {} should find valid leader in cluster metadata",
+            leader.endpoints.tcp > 0,
+            "Client {} should find valid leader TCP port in cluster metadata",
             client_name
         );
     }
@@ -298,7 +309,8 @@ async fn then_both_use_same_server(world: &mut 
LeaderContext) {
         cluster::verify_leader_in_metadata(client_b).await,
     ) {
         assert_eq!(
-            leader_a.address, leader_b.address,
+            format!("{}:{}", leader_a.ip, leader_a.endpoints.tcp),
+            format!("{}:{}", leader_b.ip, leader_b.endpoints.tcp),
             "Both clients should see the same leader"
         );
     }
diff --git a/core/common/src/types/cluster/metadata.rs 
b/core/common/src/types/cluster/metadata.rs
index c89e94084..3d6e403e0 100644
--- a/core/common/src/types/cluster/metadata.rs
+++ b/core/common/src/types/cluster/metadata.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::{BytesSerializable, IggyError, TransportProtocol, 
types::cluster::node::ClusterNode};
+use crate::{BytesSerializable, IggyError, types::cluster::node::ClusterNode};
 use bytes::{BufMut, Bytes, BytesMut};
 use serde::{Deserialize, Serialize};
 use std::fmt::Display;
@@ -28,21 +28,17 @@ pub struct ClusterMetadata {
     pub name: String,
     /// Unique identifier of the cluster.
     pub id: u32,
-    /// Transport used for cluster communication (for binary protocol it's u8, 
1=TCP, 2=QUIC, 3=HTTP).
-    /// For HTTP it's a string "tcp", "quic", "http".
-    pub transport: TransportProtocol,
-    /// List of all nodes in the cluster.
+    /// List of all nodes in the cluster with their transport endpoints.
     pub nodes: Vec<ClusterNode>,
 }
 
 impl BytesSerializable for ClusterMetadata {
     fn to_bytes(&self) -> Bytes {
         let name_bytes = self.name.as_bytes();
-        let transport = self.transport as u8;
 
         // Calculate size for each node
         let nodes_size: usize = self.nodes.iter().map(|node| 
node.get_buffer_size()).sum();
-        let size = 4 + name_bytes.len() + 4 + 1 + 4 + nodes_size; // name_len 
+ name + id + transport + nodes_len + nodes
+        let size = 4 + name_bytes.len() + 4 + 4 + nodes_size; // name_len + 
name + id + nodes_len + nodes
 
         let mut bytes = BytesMut::with_capacity(size);
 
@@ -53,9 +49,6 @@ impl BytesSerializable for ClusterMetadata {
         // Write cluster id
         bytes.put_u32_le(self.id);
 
-        // Write transport
-        bytes.put_u8(transport);
-
         // Write nodes count
         bytes.put_u32_le(self.nodes.len() as u32);
 
@@ -101,14 +94,6 @@ impl BytesSerializable for ClusterMetadata {
         );
         position += 4;
 
-        // Read transport
-        if bytes.len() < position + 1 {
-            return Err(IggyError::InvalidCommand);
-        }
-        let transport_byte = bytes[position];
-        let transport = TransportProtocol::try_from(transport_byte)?;
-        position += 1;
-
         // Read nodes count
         if bytes.len() < position + 4 {
             return Err(IggyError::InvalidCommand);
@@ -128,17 +113,11 @@ impl BytesSerializable for ClusterMetadata {
             nodes.push(node);
         }
 
-        Ok(ClusterMetadata {
-            name,
-            id,
-            transport,
-            nodes,
-        })
+        Ok(ClusterMetadata { name, id, nodes })
     }
 
     fn write_to_buffer(&self, buf: &mut BytesMut) {
         let name_bytes = self.name.as_bytes();
-        let transport = self.transport as u8;
 
         // Write name length and name
         buf.put_u32_le(name_bytes.len() as u32);
@@ -147,9 +126,6 @@ impl BytesSerializable for ClusterMetadata {
         // Write cluster id
         buf.put_u32_le(self.id);
 
-        // Write transport as u8
-        buf.put_u8(transport);
-
         // Write nodes count
         buf.put_u32_le(self.nodes.len() as u32);
 
@@ -161,7 +137,7 @@ impl BytesSerializable for ClusterMetadata {
 
     fn get_buffer_size(&self) -> usize {
         let nodes_size: usize = self.nodes.iter().map(|node| 
node.get_buffer_size()).sum();
-        4 + self.name.len() + 4 + 1 + 4 + nodes_size // name_len + name + id + 
transport + nodes_len + nodes
+        4 + self.name.len() + 4 + 4 + nodes_size // name_len + name + id + 
nodes_len + nodes
     }
 }
 
@@ -175,8 +151,8 @@ impl Display for ClusterMetadata {
 
         write!(
             f,
-            "ClusterMetadata {{ name: {}, id: {}, transport: {}, nodes: {:?} 
}}",
-            self.name, self.id, self.transport, nodes
+            "ClusterMetadata {{ name: {}, id: {}, nodes: {:?} }}",
+            self.name, self.id, nodes
         )
     }
 }
diff --git a/core/common/src/types/cluster/mod.rs 
b/core/common/src/types/cluster/mod.rs
index 7573d89be..3215bb9c5 100644
--- a/core/common/src/types/cluster/mod.rs
+++ b/core/common/src/types/cluster/mod.rs
@@ -20,8 +20,10 @@ mod metadata;
 mod node;
 mod role;
 mod status;
+mod transport_endpoints;
 
 pub use metadata::ClusterMetadata;
 pub use node::ClusterNode;
 pub use role::ClusterNodeRole;
 pub use status::ClusterNodeStatus;
+pub use transport_endpoints::TransportEndpoints;
diff --git a/core/common/src/types/cluster/node.rs 
b/core/common/src/types/cluster/node.rs
index aa2308040..63c2dad1d 100644
--- a/core/common/src/types/cluster/node.rs
+++ b/core/common/src/types/cluster/node.rs
@@ -18,7 +18,9 @@
 
 use crate::{
     BytesSerializable, IggyError,
-    types::cluster::{role::ClusterNodeRole, status::ClusterNodeStatus},
+    types::cluster::{
+        role::ClusterNodeRole, status::ClusterNodeStatus, 
transport_endpoints::TransportEndpoints,
+    },
 };
 use bytes::{BufMut, Bytes, BytesMut};
 use serde::{Deserialize, Serialize};
@@ -26,9 +28,9 @@ use std::fmt::Display;
 
 #[derive(Debug, Deserialize, Serialize, Clone)]
 pub struct ClusterNode {
-    pub id: u32,
     pub name: String,
-    pub address: String,
+    pub ip: String,
+    pub endpoints: TransportEndpoints,
     pub role: ClusterNodeRole,
     pub status: ClusterNodeStatus,
 }
@@ -43,20 +45,12 @@ impl BytesSerializable for ClusterNode {
 
     fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
         if bytes.len() < 10 {
-            // Minimum: 4 (id) + 4 (name_len) + 4 (address_len) + 1 (role) + 1 
(status)
+            // Minimum: 4 (name_len) + 4 (ip_len) + 1 (role) + 1 (status)
             return Err(IggyError::InvalidCommand);
         }
 
         let mut position = 0;
 
-        // Read id
-        let id = u32::from_le_bytes(
-            bytes[position..position + 4]
-                .try_into()
-                .map_err(|_| IggyError::InvalidNumberEncoding)?,
-        );
-        position += 4;
-
         // Read name length
         let name_len = u32::from_le_bytes(
             bytes[position..position + 4]
@@ -73,24 +67,26 @@ impl BytesSerializable for ClusterNode {
             .map_err(|_| IggyError::InvalidCommand)?;
         position += name_len;
 
-        // Read address length
-        if bytes.len() < position + 4 {
-            return Err(IggyError::InvalidCommand);
-        }
-        let address_len = u32::from_le_bytes(
+        // Read IP length
+        let ip_len = u32::from_le_bytes(
             bytes[position..position + 4]
                 .try_into()
                 .map_err(|_| IggyError::InvalidNumberEncoding)?,
         ) as usize;
         position += 4;
 
-        // Read address
-        if bytes.len() < position + address_len {
+        // Read IP
+        if bytes.len() < position + ip_len {
             return Err(IggyError::InvalidCommand);
         }
-        let address = String::from_utf8(bytes[position..position + 
address_len].to_vec())
+        let ip = String::from_utf8(bytes[position..position + ip_len].to_vec())
             .map_err(|_| IggyError::InvalidCommand)?;
-        position += address_len;
+        position += ip_len;
+
+        // Read transport endpoints
+        let endpoints_bytes = bytes.slice(position..);
+        let endpoints = TransportEndpoints::from_bytes(endpoints_bytes)?;
+        position += endpoints.get_buffer_size();
 
         // Read role
         if bytes.len() < position + 1 {
@@ -106,26 +102,26 @@ impl BytesSerializable for ClusterNode {
         let status = ClusterNodeStatus::try_from(bytes[position])?;
 
         Ok(ClusterNode {
-            id,
             name,
-            address,
+            ip,
+            endpoints,
             role,
             status,
         })
     }
 
     fn write_to_buffer(&self, buf: &mut BytesMut) {
-        buf.put_u32_le(self.id);
         buf.put_u32_le(self.name.len() as u32);
         buf.put_slice(self.name.as_bytes());
-        buf.put_u32_le(self.address.len() as u32);
-        buf.put_slice(self.address.as_bytes());
+        buf.put_u32_le(self.ip.len() as u32);
+        buf.put_slice(self.ip.as_bytes());
+        self.endpoints.write_to_buffer(buf);
         self.role.write_to_buffer(buf);
         self.status.write_to_buffer(buf);
     }
 
     fn get_buffer_size(&self) -> usize {
-        4 + 4 + self.name.len() + 4 + self.address.len() + 1 + 1 // id + 
name_len + name + address_len + address + role + status
+        4 + self.name.len() + 4 + self.ip.len() + 
self.endpoints.get_buffer_size() + 1 + 1 // name_len + name + ip_len + ip + 
endpoints + role + status
     }
 }
 
@@ -133,8 +129,8 @@ impl Display for ClusterNode {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "ClusterNode {{ id: {}, name: {}, address: {}, role: {}, status: 
{} }}",
-            self.id, self.name, self.address, self.role, self.status
+            "ClusterNode {{ name: {}, ip: {}, endpoints: {}, role: {}, status: 
{} }}",
+            self.name, self.ip, self.endpoints, self.role, self.status
         )
     }
 }
diff --git a/core/common/src/types/cluster/transport_endpoints.rs 
b/core/common/src/types/cluster/transport_endpoints.rs
new file mode 100644
index 000000000..a1e8db8b5
--- /dev/null
+++ b/core/common/src/types/cluster/transport_endpoints.rs
@@ -0,0 +1,137 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{BytesSerializable, IggyError};
+use bytes::{BufMut, Bytes, BytesMut};
+use serde::{Deserialize, Serialize};
+use std::fmt::Display;
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TransportEndpoints {
+    pub tcp: u16,
+    pub quic: u16,
+    pub http: u16,
+    pub websocket: u16,
+}
+
+impl TransportEndpoints {
+    pub fn new(tcp: u16, quic: u16, http: u16, websocket: u16) -> Self {
+        Self {
+            tcp,
+            quic,
+            http,
+            websocket,
+        }
+    }
+
+    /// Creates full address strings by combining with an IP address
+    pub fn with_ip(&self, ip: &str) -> TransportEndpointsAddresses {
+        TransportEndpointsAddresses {
+            tcp: format!("{}:{}", ip, self.tcp),
+            quic: format!("{}:{}", ip, self.quic),
+            http: format!("{}:{}", ip, self.http),
+            websocket: format!("{}:{}", ip, self.websocket),
+        }
+    }
+}
+
+/// Helper struct for when we need full addresses
+#[derive(Debug, Clone)]
+pub struct TransportEndpointsAddresses {
+    pub tcp: String,
+    pub quic: String,
+    pub http: String,
+    pub websocket: String,
+}
+
+impl BytesSerializable for TransportEndpoints {
+    fn to_bytes(&self) -> Bytes {
+        let size = self.get_buffer_size();
+        let mut bytes = BytesMut::with_capacity(size);
+        self.write_to_buffer(&mut bytes);
+        bytes.freeze()
+    }
+
+    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
+        if bytes.len() < 8 {
+            // Minimum: 4 ports * 2 bytes each
+            return Err(IggyError::InvalidCommand);
+        }
+
+        let mut position = 0;
+
+        // Read TCP port
+        let tcp = u16::from_le_bytes(
+            bytes[position..position + 2]
+                .try_into()
+                .map_err(|_| IggyError::InvalidNumberEncoding)?,
+        );
+        position += 2;
+
+        // Read QUIC port
+        let quic = u16::from_le_bytes(
+            bytes[position..position + 2]
+                .try_into()
+                .map_err(|_| IggyError::InvalidNumberEncoding)?,
+        );
+        position += 2;
+
+        // Read HTTP port
+        let http = u16::from_le_bytes(
+            bytes[position..position + 2]
+                .try_into()
+                .map_err(|_| IggyError::InvalidNumberEncoding)?,
+        );
+        position += 2;
+
+        // Read WebSocket port
+        let websocket = u16::from_le_bytes(
+            bytes[position..position + 2]
+                .try_into()
+                .map_err(|_| IggyError::InvalidNumberEncoding)?,
+        );
+
+        Ok(TransportEndpoints {
+            tcp,
+            quic,
+            http,
+            websocket,
+        })
+    }
+
+    fn write_to_buffer(&self, buf: &mut BytesMut) {
+        buf.put_u16_le(self.tcp);
+        buf.put_u16_le(self.quic);
+        buf.put_u16_le(self.http);
+        buf.put_u16_le(self.websocket);
+    }
+
+    fn get_buffer_size(&self) -> usize {
+        8 // 4 ports * 2 bytes each
+    }
+}
+
+impl Display for TransportEndpoints {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "tcp: {}, quic: {}, http: {}, websocket: {}",
+            self.tcp, self.quic, self.http, self.websocket
+        )
+    }
+}
diff --git a/core/configs/server.toml b/core/configs/server.toml
index 892c8cdbf..01500569a 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -518,26 +518,25 @@ id = 0
 # This prevents accidental cross-cluster communication.
 name = "iggy-cluster"
 
-# Transport protocol used for communication between nodes in the cluster 
(string).
-# Supported values: "tcp", "quic", "ws"
-transport = "tcp"
-
 # Current node configuration
-[cluster.node]
-# This node unique identifier within the cluster (u32).
-# Must be unique across all nodes and match one of the ids in the nodes list.
-id = 0
-
-# All nodes in the cluster.
-[[cluster.nodes]]
-id = 0
+[cluster.node.current]
+# Name of this node - must be unique across all nodes in the cluster
+# The node will use its configured transport addresses (tcp.address, 
quic.address, etc.)
 name = "iggy-node-1"
-address = "127.0.0.1:8090"
+# IP address that other nodes should use to connect to this node
+ip = "127.0.0.1"
 
-[[cluster.nodes]]
-id = 1
+# Other nodes in the cluster
+# Each node must specify all transport ports explicitly
+[[cluster.node.others]]
 name = "iggy-node-2"
-address = "127.0.0.1:8091"
+ip = "127.0.0.1"
+ports = { tcp = 8091, quic = 8081, http = 3001, websocket = 8093 }
+
+# [[cluster.node.others]]
+# name = "iggy-node-3"
+# ip = "192.168.1.100"
+# ports = { tcp = 8092, quic = 8082, http = 3002, websocket = 8094 }
 
 # Sharding configuration
 [system.sharding]
diff --git a/core/integration/tests/config_provider/mod.rs 
b/core/integration/tests/config_provider/mod.rs
index 8b41411bf..aa0976c5c 100644
--- a/core/integration/tests/config_provider/mod.rs
+++ b/core/integration/tests/config_provider/mod.rs
@@ -16,59 +16,31 @@
  * under the License.
  */
 
-use integration::file::{file_exists, get_root_path};
 use serial_test::serial;
 use server::configs::config_provider::{ConfigProvider, FileConfigProvider};
 use std::env;
 
-async fn scenario_parsing_from_file(extension: &str) {
-    let mut config_path = get_root_path().join("../configs/server");
-    assert!(config_path.set_extension(extension), "Cannot set extension");
-    let config_path = config_path.as_path().display().to_string();
-    let config_provider = FileConfigProvider::new(config_path.clone());
-    assert!(
-        file_exists(&config_path),
-        "Config file not found: {config_path}"
-    );
-    assert!(
-        config_provider.load_config().await.is_ok(),
-        "ConfigProvider failed to parse config from {config_path}"
-    );
-}
+use integration::file::get_root_path;
 
-#[compio::test]
-async fn validate_server_config_toml_from_repository() {
-    scenario_parsing_from_file("toml").await;
-}
-
-// This test needs to be run in serial because it modifies the environment 
variables
-// which are shared, since all tests run in parallel by default.
 #[serial]
-#[compio::test]
-async fn validate_custom_env_provider() {
-    let expected_datagram_send_buffer_size = "1.00 KB";
-    let expected_quic_certificate_self_signed = false;
-    let expected_http_enabled = false;
-    let expected_tcp_enabled = "false";
-    let expected_message_saver_enabled = false;
-    let expected_message_expiry = "10s";
+#[tokio::test]
+async fn validate_config_env_override() {
+    let expected_http = true;
+    let expected_tcp = true;
+    let expected_message_saver = true;
+    let expected_message_expiry = "1000ms";
 
     unsafe {
+        env::set_var("IGGY_HTTP_ENABLED", expected_http.to_string());
+        env::set_var("IGGY_TCP_ENABLED", expected_tcp.to_string());
         env::set_var(
-            "IGGY_QUIC_DATAGRAM_SEND_BUFFER_SIZE",
-            expected_datagram_send_buffer_size,
-        );
-        env::set_var(
-            "IGGY_QUIC_CERTIFICATE_SELF_SIGNED",
-            expected_quic_certificate_self_signed.to_string(),
+            "IGGY_MESSAGE_SAVER_ENABLED",
+            expected_message_saver.to_string(),
         );
-        env::set_var("IGGY_HTTP_ENABLED", expected_http_enabled.to_string());
-        env::set_var("IGGY_TCP_ENABLED", expected_tcp_enabled);
         env::set_var(
-            "IGGY_MESSAGE_SAVER_ENABLED",
-            expected_message_saver_enabled.to_string(),
+            "IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY",
+            expected_message_expiry,
         );
-        env::set_var("IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY", "10s");
     }
 
     let config_path = get_root_path().join("../configs/server.toml");
@@ -76,220 +48,145 @@ async fn validate_custom_env_provider() {
     let config = file_config_provider
         .load_config()
         .await
-        .expect("Failed to load default server.toml config");
+        .expect("Failed to load server.toml config");
 
+    assert_eq!(config.http.enabled, expected_http);
+    assert_eq!(config.tcp.enabled, expected_tcp);
+    assert_eq!(config.message_saver.enabled, expected_message_saver);
+
+    // Check message_expiry was properly set from environment variable
+    use iggy_common::{IggyDuration, IggyExpiry};
+    use std::time::Duration;
     assert_eq!(
-        config.quic.datagram_send_buffer_size.to_string(),
-        expected_datagram_send_buffer_size
-    );
-    assert_eq!(
-        config.quic.certificate.self_signed,
-        expected_quic_certificate_self_signed
-    );
-    assert_eq!(config.http.enabled, expected_http_enabled);
-    assert_eq!(config.tcp.enabled.to_string(), expected_tcp_enabled);
-    assert_eq!(config.message_saver.enabled, expected_message_saver_enabled);
-    assert_eq!(
-        config.system.segment.message_expiry.to_string(),
-        expected_message_expiry
+        config.system.segment.message_expiry,
+        
IggyExpiry::ExpireDuration(IggyDuration::from(Duration::from_millis(1000)))
     );
 
     unsafe {
-        env::remove_var("IGGY_QUIC_DATAGRAM_SEND_BUFFER_SIZE");
-        env::remove_var("IGGY_QUIC_CERTIFICATE_SELF_SIGNED");
         env::remove_var("IGGY_HTTP_ENABLED");
         env::remove_var("IGGY_TCP_ENABLED");
         env::remove_var("IGGY_MESSAGE_SAVER_ENABLED");
-        env::remove_var("IGGY_SYSTEM_RETENTION_POLICY_MESSAGE_EXPIRY");
+        env::remove_var("IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY");
     }
 }
 
-// Test for cluster configuration with environment variable overrides
 #[serial]
 #[tokio::test]
-async fn validate_cluster_config_env_override() {
-    // Test data for cluster configuration
-    let expected_cluster_enabled = true;
-    let expected_cluster_id = 0;
-    let expected_cluster_name = "test-cluster";
-    let expected_node_id = 0;
-
-    // Test data for cluster nodes array
-    let expected_node_0_id = 0;
-    let expected_node_0_name = "test-node-1";
-    let expected_node_0_address = "192.168.1.100:9090";
-
-    let expected_node_1_id = 1;
-    let expected_node_1_name = "test-node-2";
-    let expected_node_1_address = "192.168.1.101:9091";
-
-    let expected_node_2_id = 2;
-    let expected_node_2_name = "test-node-3";
-    let expected_node_2_address = "192.168.1.102:9092";
-
+async fn validate_config_invalid_env() {
     unsafe {
-        // Set cluster configuration environment variables
-        env::set_var("IGGY_CLUSTER_ENABLED", 
expected_cluster_enabled.to_string());
-        env::set_var("IGGY_CLUSTER_ID", expected_cluster_id.to_string());
-        env::set_var("IGGY_CLUSTER_NAME", expected_cluster_name);
-        env::set_var("IGGY_CLUSTER_NODE_ID", expected_node_id.to_string());
-
-        // Set cluster nodes array environment variables
-        env::set_var("IGGY_CLUSTER_NODES_0_ID", 
expected_node_0_id.to_string());
-        env::set_var("IGGY_CLUSTER_NODES_0_NAME", expected_node_0_name);
-        env::set_var("IGGY_CLUSTER_NODES_0_ADDRESS", expected_node_0_address);
-
-        env::set_var("IGGY_CLUSTER_NODES_1_ID", 
expected_node_1_id.to_string());
-        env::set_var("IGGY_CLUSTER_NODES_1_NAME", expected_node_1_name);
-        env::set_var("IGGY_CLUSTER_NODES_1_ADDRESS", expected_node_1_address);
-
-        env::set_var("IGGY_CLUSTER_NODES_2_ID", 
expected_node_2_id.to_string());
-        env::set_var("IGGY_CLUSTER_NODES_2_NAME", expected_node_2_name);
-        env::set_var("IGGY_CLUSTER_NODES_2_ADDRESS", expected_node_2_address);
+        env::set_var("IGGY_HTTP_ENABLED", "wrong bool");
     }
 
     let config_path = get_root_path().join("../configs/server.toml");
     let file_config_provider = 
FileConfigProvider::new(config_path.as_path().display().to_string());
-    let config = file_config_provider
-        .load_config()
-        .await
-        .expect("Failed to load server.toml config with cluster env 
overrides");
-
-    // Verify cluster configuration
-    assert_eq!(config.cluster.enabled, expected_cluster_enabled);
-    assert_eq!(config.cluster.id, expected_cluster_id);
-    assert_eq!(config.cluster.name, expected_cluster_name);
-    assert_eq!(config.cluster.node.id, expected_node_id);
+    let result = file_config_provider.load_config().await;
 
-    // Verify cluster nodes array - should have 3 nodes instead of the default 
2
-    assert_eq!(
-        config.cluster.nodes.len(),
-        3,
-        "Should have 3 nodes from environment variables"
+    assert!(
+        result.is_err(),
+        "Config should fail with invalid env values"
     );
 
-    // Verify first node
-    assert_eq!(config.cluster.nodes[0].id, expected_node_0_id);
-    assert_eq!(config.cluster.nodes[0].name, expected_node_0_name);
-    assert_eq!(config.cluster.nodes[0].address, expected_node_0_address);
-
-    // Verify second node
-    assert_eq!(config.cluster.nodes[1].id, expected_node_1_id);
-    assert_eq!(config.cluster.nodes[1].name, expected_node_1_name);
-    assert_eq!(config.cluster.nodes[1].address, expected_node_1_address);
-
-    // Verify third node (added via env vars)
-    assert_eq!(config.cluster.nodes[2].id, expected_node_2_id);
-    assert_eq!(config.cluster.nodes[2].name, expected_node_2_name);
-    assert_eq!(config.cluster.nodes[2].address, expected_node_2_address);
-
     unsafe {
-        // Clean up environment variables
-        env::remove_var("IGGY_CLUSTER_ENABLED");
-        env::remove_var("IGGY_CLUSTER_ID");
-        env::remove_var("IGGY_CLUSTER_NAME");
-        env::remove_var("IGGY_CLUSTER_NODE_ID");
-
-        env::remove_var("IGGY_CLUSTER_NODES_0_ID");
-        env::remove_var("IGGY_CLUSTER_NODES_0_NAME");
-        env::remove_var("IGGY_CLUSTER_NODES_0_ADDRESS");
-
-        env::remove_var("IGGY_CLUSTER_NODES_1_ID");
-        env::remove_var("IGGY_CLUSTER_NODES_1_NAME");
-        env::remove_var("IGGY_CLUSTER_NODES_1_ADDRESS");
-
-        env::remove_var("IGGY_CLUSTER_NODES_2_ID");
-        env::remove_var("IGGY_CLUSTER_NODES_2_NAME");
-        env::remove_var("IGGY_CLUSTER_NODES_2_ADDRESS");
+        env::remove_var("IGGY_HTTP_ENABLED");
     }
 }
 
-// Test partial override - only override specific fields
+// Test empty list validation
 #[serial]
 #[tokio::test]
-async fn validate_cluster_partial_env_override() {
-    // Only override the cluster ID and one node's address
-    let expected_cluster_id = 99;
-    let expected_node_1_address = "10.0.0.1:8888";
-
-    unsafe {
-        env::set_var("IGGY_CLUSTER_ID", expected_cluster_id.to_string());
-        env::set_var("IGGY_CLUSTER_NODES_1_ADDRESS", expected_node_1_address);
-    }
-
+async fn validate_empty_array_config() {
     let config_path = get_root_path().join("../configs/server.toml");
     let file_config_provider = 
FileConfigProvider::new(config_path.as_path().display().to_string());
-    let config = file_config_provider
+    let _config = file_config_provider
         .load_config()
         .await
-        .expect("Failed to load server.toml config with partial cluster env 
overrides");
-
-    // Verify overridden values
-    assert_eq!(config.cluster.id, expected_cluster_id);
-    assert_eq!(config.cluster.nodes[1].address, expected_node_1_address);
-
-    // Verify non-overridden values remain default
-    assert_eq!(config.cluster.name, "iggy-cluster"); // default from 
server.toml
-    assert_eq!(config.cluster.nodes[0].id, 0); // default from server.toml
-    assert_eq!(config.cluster.nodes[0].name, "iggy-node-1"); // default from 
server.toml
-    assert_eq!(config.cluster.nodes[1].id, 1); // default from server.toml
-    assert_eq!(config.cluster.nodes[1].name, "iggy-node-2"); // default from 
server.toml
+        .expect("Failed to load server.toml config");
 
+    // Ensure environment variable overrides do not affect empty arrays
     unsafe {
-        env::remove_var("IGGY_CLUSTER_ID");
-        env::remove_var("IGGY_CLUSTER_NODES_1_ADDRESS");
+        env::remove_var("IGGY_HTTP_ENABLED");
+        env::remove_var("IGGY_TCP_ENABLED");
+        env::remove_var("IGGY_MESSAGE_SAVER_ENABLED");
+        env::remove_var("IGGY_SYSTEM_SEGMENT_MESSAGE_EXPIRY");
     }
 }
 
-// Test sparse array override - setting index 5 when only 2 nodes exist in TOML
-// This test verifies that sparse arrays will fail because intermediate 
elements
-// won't have required fields, which is the expected safety behavior
+// Test for cluster configuration with environment variable overrides
 #[serial]
 #[tokio::test]
-async fn validate_cluster_sparse_array_fails_with_missing_fields() {
-    // Set node at index 5 (when TOML only has nodes 0 and 1)
-    // This should fail because nodes 2-4 will be created as empty dicts
-    // without required fields
-    let expected_node_5_id = 100;
-    let expected_node_5_name = "sparse-node";
-    let expected_node_5_address = "10.0.0.100:9999";
-
-    unsafe {
-        env::set_var("IGGY_CLUSTER_NODES_5_ID", 
expected_node_5_id.to_string());
-        env::set_var("IGGY_CLUSTER_NODES_5_NAME", expected_node_5_name);
-        env::set_var("IGGY_CLUSTER_NODES_5_ADDRESS", expected_node_5_address);
-    }
-
-    let config_path = get_root_path().join("../configs/server.toml");
-    let file_config_provider = 
FileConfigProvider::new(config_path.as_path().display().to_string());
-
-    // This should fail because nodes 2-4 will be missing required fields
-    let result = file_config_provider.load_config().await;
-    assert!(
-        result.is_err(),
-        "Should fail to load config with sparse array due to missing required 
fields in intermediate elements"
-    );
+async fn validate_cluster_config_env_override() {
+    // Test data for cluster configuration
+    let expected_cluster_enabled = true;
+    let expected_cluster_id = 0;
+    let expected_cluster_name = "test-cluster";
+    let expected_current_node_name = "test-node-1";
+
+    // Test data for other nodes in cluster
+    let expected_other_node_0_name = "test-node-2";
+    let expected_other_node_0_ip = "192.168.1.101";
+    let expected_other_node_0_tcp = 9091_u16;
+    let expected_other_node_0_quic = 9081_u16;
+    let expected_other_node_0_http = 4001_u16;
+    let expected_other_node_0_websocket = 9093_u16;
+
+    let expected_other_node_1_name = "test-node-3";
+    let expected_other_node_1_ip = "192.168.1.102";
+    let expected_other_node_1_tcp = 9092_u16;
+    let expected_other_node_1_quic = 9082_u16;
+    let expected_other_node_1_http = 4002_u16;
+    let expected_other_node_1_websocket = 9094_u16;
 
     unsafe {
-        env::remove_var("IGGY_CLUSTER_NODES_5_ID");
-        env::remove_var("IGGY_CLUSTER_NODES_5_NAME");
-        env::remove_var("IGGY_CLUSTER_NODES_5_ADDRESS");
-    }
-}
+        // Set cluster configuration environment variables
+        env::set_var("IGGY_CLUSTER_ENABLED", 
expected_cluster_enabled.to_string());
+        env::set_var("IGGY_CLUSTER_ID", expected_cluster_id.to_string());
+        env::set_var("IGGY_CLUSTER_NAME", expected_cluster_name);
+        env::set_var("IGGY_CLUSTER_NODE_CURRENT_NAME", 
expected_current_node_name);
 
-// Test that we can add node 2 successfully (contiguous array)
-#[serial]
-#[tokio::test]
-async fn validate_cluster_contiguous_array_override() {
-    // Add node at index 2 (TOML has nodes 0 and 1, so this is contiguous)
-    let expected_node_2_id = 2;
-    let expected_node_2_name = "iggy-node-3";
-    let expected_node_2_address = "10.0.0.50:8092";
+        // Set other nodes array environment variables
+        env::set_var(
+            "IGGY_CLUSTER_NODE_OTHERS_0_NAME",
+            expected_other_node_0_name,
+        );
+        env::set_var("IGGY_CLUSTER_NODE_OTHERS_0_IP", 
expected_other_node_0_ip);
+        env::set_var(
+            "IGGY_CLUSTER_NODE_OTHERS_0_PORTS_TCP",
+            expected_other_node_0_tcp.to_string(),
+        );
+        env::set_var(
+            "IGGY_CLUSTER_NODE_OTHERS_0_PORTS_QUIC",
+            expected_other_node_0_quic.to_string(),
+        );
+        env::set_var(
+            "IGGY_CLUSTER_NODE_OTHERS_0_PORTS_HTTP",
+            expected_other_node_0_http.to_string(),
+        );
+        env::set_var(
+            "IGGY_CLUSTER_NODE_OTHERS_0_PORTS_WEBSOCKET",
+            expected_other_node_0_websocket.to_string(),
+        );
 
-    unsafe {
-        env::set_var("IGGY_CLUSTER_NODES_2_ID", 
expected_node_2_id.to_string());
-        env::set_var("IGGY_CLUSTER_NODES_2_NAME", expected_node_2_name);
-        env::set_var("IGGY_CLUSTER_NODES_2_ADDRESS", expected_node_2_address);
+        env::set_var(
+            "IGGY_CLUSTER_NODE_OTHERS_1_NAME",
+            expected_other_node_1_name,
+        );
+        env::set_var("IGGY_CLUSTER_NODE_OTHERS_1_IP", 
expected_other_node_1_ip);
+        env::set_var(
+            "IGGY_CLUSTER_NODE_OTHERS_1_PORTS_TCP",
+            expected_other_node_1_tcp.to_string(),
+        );
+        env::set_var(
+            "IGGY_CLUSTER_NODE_OTHERS_1_PORTS_QUIC",
+            expected_other_node_1_quic.to_string(),
+        );
+        env::set_var(
+            "IGGY_CLUSTER_NODE_OTHERS_1_PORTS_HTTP",
+            expected_other_node_1_http.to_string(),
+        );
+        env::set_var(
+            "IGGY_CLUSTER_NODE_OTHERS_1_PORTS_WEBSOCKET",
+            expected_other_node_1_websocket.to_string(),
+        );
     }
 
     let config_path = get_root_path().join("../configs/server.toml");
@@ -297,29 +194,86 @@ async fn validate_cluster_contiguous_array_override() {
     let config = file_config_provider
         .load_config()
         .await
-        .expect("Failed to load server.toml config with contiguous array 
override");
+        .expect("Failed to load server.toml config with cluster env 
overrides");
+
+    // Verify cluster configuration
+    assert_eq!(config.cluster.enabled, expected_cluster_enabled);
+    assert_eq!(config.cluster.id, expected_cluster_id);
+    assert_eq!(config.cluster.name, expected_cluster_name);
+    assert_eq!(config.cluster.node.current.name, expected_current_node_name);
 
-    // Should have 3 nodes total
+    // Verify other nodes array - should have 2 nodes from environment 
variables
     assert_eq!(
-        config.cluster.nodes.len(),
-        3,
-        "Should have 3 nodes when adding index 2"
+        config.cluster.node.others.len(),
+        2,
+        "Should have 2 other nodes from environment variables"
     );
 
-    // Check original nodes are preserved
-    assert_eq!(config.cluster.nodes[0].id, 0); // from TOML
-    assert_eq!(config.cluster.nodes[0].name, "iggy-node-1"); // from TOML
-    assert_eq!(config.cluster.nodes[1].id, 1); // from TOML
-    assert_eq!(config.cluster.nodes[1].name, "iggy-node-2"); // from TOML
+    // Verify first other node
+    assert_eq!(
+        config.cluster.node.others[0].name,
+        expected_other_node_0_name
+    );
+    assert_eq!(config.cluster.node.others[0].ip, expected_other_node_0_ip);
+    assert_eq!(
+        config.cluster.node.others[0].ports.tcp,
+        expected_other_node_0_tcp
+    );
+    assert_eq!(
+        config.cluster.node.others[0].ports.quic,
+        expected_other_node_0_quic
+    );
+    assert_eq!(
+        config.cluster.node.others[0].ports.http,
+        expected_other_node_0_http
+    );
+    assert_eq!(
+        config.cluster.node.others[0].ports.websocket,
+        expected_other_node_0_websocket
+    );
 
-    // Check the node we added at index 2
-    assert_eq!(config.cluster.nodes[2].id, expected_node_2_id);
-    assert_eq!(config.cluster.nodes[2].name, expected_node_2_name);
-    assert_eq!(config.cluster.nodes[2].address, expected_node_2_address);
+    // Verify second other node
+    assert_eq!(
+        config.cluster.node.others[1].name,
+        expected_other_node_1_name
+    );
+    assert_eq!(config.cluster.node.others[1].ip, expected_other_node_1_ip);
+    assert_eq!(
+        config.cluster.node.others[1].ports.tcp,
+        expected_other_node_1_tcp
+    );
+    assert_eq!(
+        config.cluster.node.others[1].ports.quic,
+        expected_other_node_1_quic
+    );
+    assert_eq!(
+        config.cluster.node.others[1].ports.http,
+        expected_other_node_1_http
+    );
+    assert_eq!(
+        config.cluster.node.others[1].ports.websocket,
+        expected_other_node_1_websocket
+    );
 
     unsafe {
-        env::remove_var("IGGY_CLUSTER_NODES_2_ID");
-        env::remove_var("IGGY_CLUSTER_NODES_2_NAME");
-        env::remove_var("IGGY_CLUSTER_NODES_2_ADDRESS");
+        // Clean up environment variables
+        env::remove_var("IGGY_CLUSTER_ENABLED");
+        env::remove_var("IGGY_CLUSTER_ID");
+        env::remove_var("IGGY_CLUSTER_NAME");
+        env::remove_var("IGGY_CLUSTER_NODE_CURRENT_NAME");
+
+        env::remove_var("IGGY_CLUSTER_NODE_OTHERS_0_NAME");
+        env::remove_var("IGGY_CLUSTER_NODE_OTHERS_0_IP");
+        env::remove_var("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_TCP");
+        env::remove_var("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_QUIC");
+        env::remove_var("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_HTTP");
+        env::remove_var("IGGY_CLUSTER_NODE_OTHERS_0_PORTS_WEBSOCKET");
+
+        env::remove_var("IGGY_CLUSTER_NODE_OTHERS_1_NAME");
+        env::remove_var("IGGY_CLUSTER_NODE_OTHERS_1_IP");
+        env::remove_var("IGGY_CLUSTER_NODE_OTHERS_1_PORTS_TCP");
+        env::remove_var("IGGY_CLUSTER_NODE_OTHERS_1_PORTS_QUIC");
+        env::remove_var("IGGY_CLUSTER_NODE_OTHERS_1_PORTS_HTTP");
+        env::remove_var("IGGY_CLUSTER_NODE_OTHERS_1_PORTS_WEBSOCKET");
     }
 }
diff --git a/core/integration/tests/mcp/mod.rs 
b/core/integration/tests/mcp/mod.rs
index 124522a3a..74017d45a 100644
--- a/core/integration/tests/mcp/mod.rs
+++ b/core/integration/tests/mcp/mod.rs
@@ -78,8 +78,6 @@ async fn mcp_server_should_handle_ping() {
     assert_empty_response("ping", None).await;
 }
 
-// TODO(hubcio): not sure how to fix the cluster ports in CI
-#[ignore]
 #[tokio::test]
 #[parallel]
 async fn mcp_server_should_return_cluster_metadata() {
@@ -88,7 +86,7 @@ async fn mcp_server_should_return_cluster_metadata() {
         assert!(!cluster.name.is_empty());
         assert_eq!(cluster.nodes.len(), 1);
         let node = &cluster.nodes[0];
-        assert_eq!(node.id, 0);
+        assert!(!node.name.is_empty());
     })
     .await;
 }
@@ -579,8 +577,7 @@ async fn invoke_request<T: DeserializeOwned>(
 
 async fn setup() -> McpInfra {
     let mut iggy_envs = HashMap::new();
-    // TODO(hubcio): not sure how to fix the cluster ports in CI
-    // iggy_envs.insert("IGGY_CLUSTER_ENABLED".to_owned(), "true".to_owned());
+    iggy_envs.insert("IGGY_CLUSTER_ENABLED".to_owned(), "true".to_owned());
     iggy_envs.insert("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned());
     iggy_envs.insert("IGGY_WEBSOCKET_ENABLED".to_owned(), "false".to_owned());
     let mut test_server = TestServer::new(Some(iggy_envs), true, None, 
IpAddrKind::V4);
diff --git a/core/sdk/src/leader_aware.rs b/core/sdk/src/leader_aware.rs
index 9367a7b13..9e910c583 100644
--- a/core/sdk/src/leader_aware.rs
+++ b/core/sdk/src/leader_aware.rs
@@ -19,6 +19,7 @@
 use iggy_binary_protocol::ClusterClient;
 use iggy_common::{
     ClusterMetadata, ClusterNodeRole, ClusterNodeStatus, IggyError, 
IggyErrorDiscriminants,
+    TransportProtocol,
 };
 use std::net::SocketAddr;
 use std::str::FromStr;
@@ -31,6 +32,7 @@ const MAX_LEADER_REDIRECTS: u8 = 3;
 pub async fn check_and_redirect_to_leader<C: ClusterClient>(
     client: &C,
     current_address: &str,
+    transport: TransportProtocol,
 ) -> Result<Option<String>, IggyError> {
     debug!("Checking cluster metadata for leader detection");
 
@@ -41,7 +43,7 @@ pub async fn check_and_redirect_to_leader<C: ClusterClient>(
                 metadata.nodes.len(),
                 metadata.name
             );
-            process_cluster_metadata(&metadata, current_address)
+            process_cluster_metadata(&metadata, current_address, transport)
         }
         Err(e) if is_feature_unavailable_error(&e) => {
             debug!("Cluster metadata feature unavailable - server doesn't 
support clustering");
@@ -61,6 +63,7 @@ pub async fn check_and_redirect_to_leader<C: ClusterClient>(
 fn process_cluster_metadata(
     metadata: &ClusterMetadata,
     current_address: &str,
+    transport: TransportProtocol,
 ) -> Result<Option<String>, IggyError> {
     let leader = metadata
         .nodes
@@ -69,17 +72,25 @@ fn process_cluster_metadata(
 
     match leader {
         Some(leader_node) => {
+            let leader_port = match transport {
+                TransportProtocol::Tcp => leader_node.endpoints.tcp,
+                TransportProtocol::Quic => leader_node.endpoints.quic,
+                TransportProtocol::Http => leader_node.endpoints.http,
+                TransportProtocol::WebSocket => 
leader_node.endpoints.websocket,
+            };
+            let leader_address = format!("{}:{}", leader_node.ip, leader_port);
+
             info!(
-                "Found leader node: {} at {}",
-                leader_node.name, leader_node.address
+                "Found leader node: {} at {} (using {} transport)",
+                leader_node.name, leader_address, transport
             );
 
-            if !is_same_address(current_address, &leader_node.address) {
+            if !is_same_address(current_address, &leader_address) {
                 info!(
                     "Current connection to {} is not the leader, will redirect 
to {}",
-                    current_address, leader_node.address
+                    current_address, leader_address
                 );
-                Ok(Some(leader_node.address.clone()))
+                Ok(Some(leader_address))
             } else {
                 debug!("Already connected to leader at {}", current_address);
                 Ok(None)
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index 8780dd947..5459b3125 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -64,9 +64,9 @@ pub use iggy_common::{
     PollingKind, PollingStrategy, QuicClientConfig, QuicClientConfigBuilder,
     QuicClientReconnectionConfig, SendMessages, Sizeable, SnapshotCompression, 
Stats, Stream,
     StreamDetails, StreamPermissions, SystemSnapshotType, TcpClientConfig, 
TcpClientConfigBuilder,
-    TcpClientReconnectionConfig, Topic, TopicDetails, TopicPermissions, 
TransportProtocol, UserId,
-    UserStatus, Validatable, WebSocketClientConfig, 
WebSocketClientConfigBuilder,
-    WebSocketClientReconnectionConfig, defaults, locking,
+    TcpClientReconnectionConfig, Topic, TopicDetails, TopicPermissions, 
TransportEndpoints,
+    TransportProtocol, UserId, UserStatus, Validatable, WebSocketClientConfig,
+    WebSocketClientConfigBuilder, WebSocketClientReconnectionConfig, defaults, 
locking,
 };
 pub use iggy_common::{
     IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, IGGY_MESSAGE_HEADER_SIZE,
diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs
index ecf5bbf86..3f7f6854f 100644
--- a/core/sdk/src/quic/quic_client.rs
+++ b/core/sdk/src/quic/quic_client.rs
@@ -424,7 +424,12 @@ impl QuicClient {
     /// Returns true if redirection occurred and reconnection is needed.
     pub(crate) async fn handle_leader_redirection(&self) -> Result<bool, 
IggyError> {
         let current_address = self.current_server_address.lock().await.clone();
-        let leader_address = check_and_redirect_to_leader(self, 
&current_address).await?;
+        let leader_address = check_and_redirect_to_leader(
+            self,
+            &current_address,
+            iggy_common::TransportProtocol::Quic,
+        )
+        .await?;
 
         if let Some(new_leader_address) = leader_address {
             let mut redirection_state = 
self.leader_redirection_state.lock().await;
diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs
index 66017fad3..0c25a5173 100644
--- a/core/sdk/src/tcp/tcp_client.rs
+++ b/core/sdk/src/tcp/tcp_client.rs
@@ -476,7 +476,12 @@ impl TcpClient {
     /// Returns true if redirection occurred and reconnection is needed.
     pub(crate) async fn handle_leader_redirection(&self) -> Result<bool, 
IggyError> {
         let current_address = self.current_server_address.lock().await.clone();
-        let leader_address = check_and_redirect_to_leader(self, 
&current_address).await?;
+        let leader_address = check_and_redirect_to_leader(
+            self,
+            &current_address,
+            iggy_common::TransportProtocol::Tcp,
+        )
+        .await?;
 
         if let Some(new_leader_address) = leader_address {
             let mut redirection_state = 
self.leader_redirection_state.lock().await;
diff --git a/core/sdk/src/websocket/websocket_client.rs 
b/core/sdk/src/websocket/websocket_client.rs
index 399df9ba7..ff89d2a30 100644
--- a/core/sdk/src/websocket/websocket_client.rs
+++ b/core/sdk/src/websocket/websocket_client.rs
@@ -437,7 +437,12 @@ impl WebSocketClient {
     /// Returns true if redirection occurred and reconnection is needed.
     pub(crate) async fn handle_leader_redirection(&self) -> Result<bool, 
IggyError> {
         let current_address = self.current_server_address.lock().await.clone();
-        let leader_address = check_and_redirect_to_leader(self, 
&current_address).await?;
+        let leader_address = check_and_redirect_to_leader(
+            self,
+            &current_address,
+            iggy_common::TransportProtocol::WebSocket,
+        )
+        .await?;
 
         if let Some(new_leader_address) = leader_address {
             let mut redirection_state = 
self.leader_redirection_state.lock().await;
diff --git a/core/server/src/configs/cluster.rs 
b/core/server/src/configs/cluster.rs
index 3a0d4506c..59ea8c731 100644
--- a/core/server/src/configs/cluster.rs
+++ b/core/server/src/configs/cluster.rs
@@ -16,7 +16,6 @@
  * under the License.
  */
 
-use iggy_common::TransportProtocol;
 use serde::{Deserialize, Serialize};
 
 #[derive(Debug, Deserialize, Serialize, Clone)]
@@ -24,19 +23,32 @@ pub struct ClusterConfig {
     pub enabled: bool,
     pub name: String,
     pub id: u32,
-    pub transport: TransportProtocol,
     pub node: NodeConfig,
-    pub nodes: Vec<ClusterNodeConfig>,
 }
 
 #[derive(Debug, Deserialize, Serialize, Clone)]
 pub struct NodeConfig {
-    pub id: u32,
+    pub current: CurrentNodeConfig,
+    pub others: Vec<OtherNodeConfig>,
 }
 
 #[derive(Debug, Deserialize, Serialize, Clone)]
-pub struct ClusterNodeConfig {
-    pub id: u32,
+pub struct CurrentNodeConfig {
     pub name: String,
-    pub address: String,
+    pub ip: String,
+}
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct OtherNodeConfig {
+    pub name: String,
+    pub ip: String,
+    pub ports: TransportPorts,
+}
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TransportPorts {
+    pub tcp: u16,
+    pub quic: u16,
+    pub http: u16,
+    pub websocket: u16,
 }
diff --git a/core/server/src/configs/config_provider.rs 
b/core/server/src/configs/config_provider.rs
index b77ec4bdf..539aa9cdb 100644
--- a/core/server/src/configs/config_provider.rs
+++ b/core/server/src/configs/config_provider.rs
@@ -168,12 +168,27 @@ impl CustomEnvProvider {
             if remaining_path.is_empty() {
                 arr[array_index] = value;
             } else if let FigmentValue::Dict(_, elem_dict) = &mut 
arr[array_index] {
-                Self::insert_overridden_values_from_env(
-                    &Dict::new(),
-                    elem_dict,
-                    remaining_path.to_vec(),
-                    value,
-                );
+                // For nested structures in arrays, check if we need to create 
intermediate dicts
+                // Handle the "ports" case where it should be a nested 
structure
+                if remaining_path.len() >= 2 && remaining_path[0] == "ports" {
+                    // Create the ports dict if it doesn't exist
+                    elem_dict
+                        .entry("ports".to_string())
+                        .or_insert_with(|| FigmentValue::Dict(Tag::Default, 
Dict::new()));
+
+                    if let Some(FigmentValue::Dict(_, ports_dict)) = 
elem_dict.get_mut("ports") {
+                        // Insert the specific port value (tcp, quic, http, 
websocket)
+                        ports_dict.insert(remaining_path[1].clone(), value);
+                    }
+                } else {
+                    // Default behavior for other fields
+                    Self::insert_overridden_values_from_env(
+                        &Dict::new(),
+                        elem_dict,
+                        remaining_path.to_vec(),
+                        value,
+                    );
+                }
             }
         }
     }
diff --git a/core/server/src/configs/defaults.rs 
b/core/server/src/configs/defaults.rs
index ce990815c..a2ec9f229 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -19,7 +19,8 @@
 use super::sharding::ShardingConfig;
 use super::system::MemoryPoolConfig;
 use super::tcp::TcpSocketConfig;
-use crate::configs::cluster::{ClusterConfig, ClusterNodeConfig, NodeConfig};
+use crate::configs::cluster::CurrentNodeConfig;
+use crate::configs::cluster::{ClusterConfig, NodeConfig};
 use crate::configs::http::{
     HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, 
HttpTlsConfig,
 };
@@ -546,20 +547,7 @@ impl Default for ClusterConfig {
             enabled: SERVER_CONFIG.cluster.enabled,
             id: SERVER_CONFIG.cluster.id as u32,
             name: SERVER_CONFIG.cluster.name.parse().unwrap(),
-            transport: SERVER_CONFIG.cluster.transport.parse().unwrap(),
             node: NodeConfig::default(),
-            nodes: vec![
-                ClusterNodeConfig {
-                    id: SERVER_CONFIG.cluster.nodes[0].id as u32,
-                    name: SERVER_CONFIG.cluster.nodes[0].name.parse().unwrap(),
-                    address: 
SERVER_CONFIG.cluster.nodes[0].address.parse().unwrap(),
-                },
-                ClusterNodeConfig {
-                    id: SERVER_CONFIG.cluster.nodes[1].id as u32,
-                    name: SERVER_CONFIG.cluster.nodes[1].name.parse().unwrap(),
-                    address: 
SERVER_CONFIG.cluster.nodes[1].address.parse().unwrap(),
-                },
-            ],
         }
     }
 }
@@ -567,7 +555,11 @@ impl Default for ClusterConfig {
 impl Default for NodeConfig {
     fn default() -> NodeConfig {
         NodeConfig {
-            id: SERVER_CONFIG.cluster.node.id as u32,
+            current: CurrentNodeConfig {
+                name: SERVER_CONFIG.cluster.node.current.name.parse().unwrap(),
+                ip: SERVER_CONFIG.cluster.node.current.ip.parse().unwrap(),
+            },
+            others: vec![], // Empty by default, will be populated from config 
if present
         }
     }
 }
diff --git a/core/server/src/configs/validators.rs 
b/core/server/src/configs/validators.rs
index 792b8f434..3a4a66a16 100644
--- a/core/server/src/configs/validators.rs
+++ b/core/server/src/configs/validators.rs
@@ -337,77 +337,70 @@ impl Validatable<ConfigError> for ClusterConfig {
             return Err(ConfigError::InvalidConfiguration);
         }
 
-        // Validate nodes list is not empty
-        if self.nodes.is_empty() {
-            eprintln!("Invalid cluster configuration: nodes list cannot be 
empty");
+        // Validate current node name is not empty
+        if self.node.current.name.trim().is_empty() {
+            eprintln!("Invalid cluster configuration: current node name cannot 
be empty");
             return Err(ConfigError::InvalidConfiguration);
         }
 
-        // Check if nodes start from ID 0
-        let has_node_zero = self.nodes.iter().any(|node| node.id == 0);
-        if !has_node_zero {
-            eprintln!("Invalid cluster configuration: nodes must start from ID 
0");
-            return Err(ConfigError::InvalidConfiguration);
-        }
-
-        // Check if current node ID exists in nodes vector
-        let current_node_exists = self.nodes.iter().any(|node| node.id == 
self.node.id);
-        if !current_node_exists {
-            eprintln!(
-                "Invalid cluster configuration: current node ID {} not found 
in nodes list",
-                self.node.id
-            );
-            return Err(ConfigError::InvalidConfiguration);
-        }
+        // Check for duplicate node names among other nodes
+        let mut node_names = std::collections::HashSet::new();
+        node_names.insert(self.node.current.name.clone());
 
-        // Check for duplicate node IDs
-        let mut node_ids = std::collections::HashSet::new();
-        for node in &self.nodes {
-            if !node_ids.insert(node.id) {
+        for node in &self.node.others {
+            if !node_names.insert(node.name.clone()) {
                 eprintln!(
-                    "Invalid cluster configuration: duplicate node ID {} 
found",
-                    node.id
+                    "Invalid cluster configuration: duplicate node name '{}' 
found",
+                    node.name
                 );
                 return Err(ConfigError::InvalidConfiguration);
             }
         }
 
-        // Validate unique addresses (IP:port combinations)
-        let mut addresses = std::collections::HashSet::new();
-        for node in &self.nodes {
-            // Validate address format (should contain IP:port or [IPv6]:port)
-            let is_valid_address = if node.address.starts_with('[') {
-                // IPv6 address format: [::1]:8090
-                node.address.contains("]:") && 
node.address.matches(':').count() >= 2
-            } else {
-                // IPv4 address format: 127.0.0.1:8090
-                node.address.matches(':').count() == 1
-            };
-
-            if !is_valid_address {
-                eprintln!(
-                    "Invalid cluster configuration: malformed address '{}' for 
node ID {}",
-                    node.address, node.id
-                );
+        // Validate each other node configuration
+        let mut used_endpoints = std::collections::HashSet::new();
+        for node in &self.node.others {
+            // Validate node name is not empty
+            if node.name.trim().is_empty() {
+                eprintln!("Invalid cluster configuration: node name cannot be 
empty");
                 return Err(ConfigError::InvalidConfiguration);
             }
 
-            // Check for duplicate full addresses
-            if !addresses.insert(node.address.clone()) {
+            // Validate IP is not empty
+            if node.ip.trim().is_empty() {
                 eprintln!(
-                    "Invalid cluster configuration: duplicate address {} found 
(node ID: {})",
-                    node.address, node.id
+                    "Invalid cluster configuration: IP cannot be empty for 
node '{}'",
+                    node.name
                 );
                 return Err(ConfigError::InvalidConfiguration);
             }
 
-            // Validate node name is not empty
-            if node.name.trim().is_empty() {
-                eprintln!(
-                    "Invalid cluster configuration: node name cannot be empty 
for node ID {}",
-                    node.id
-                );
-                return Err(ConfigError::InvalidConfiguration);
+            // Validate transport ports
+            let ports = [
+                ("TCP", node.ports.tcp),
+                ("QUIC", node.ports.quic),
+                ("HTTP", node.ports.http),
+                ("WebSocket", node.ports.websocket),
+            ];
+
+            for (name, port) in &ports {
+                if *port == 0 {
+                    eprintln!(
+                        "Invalid cluster configuration: {} port cannot be 0 
for node '{}'",
+                        name, node.name
+                    );
+                    return Err(ConfigError::InvalidConfiguration);
+                }
+
+                // Check for port conflicts across nodes on the same IP
+                let endpoint = format!("{}:{}:{}", node.ip, name, port);
+                if !used_endpoints.insert(endpoint.clone()) {
+                    eprintln!(
+                        "Invalid cluster configuration: port conflict - {}:{} 
is already used",
+                        node.ip, port
+                    );
+                    return Err(ConfigError::InvalidConfiguration);
+                }
             }
         }
 
diff --git a/core/server/src/shard/system/cluster.rs 
b/core/server/src/shard/system/cluster.rs
index e4af6d724..91cda8c1b 100644
--- a/core/server/src/shard/system/cluster.rs
+++ b/core/server/src/shard/system/cluster.rs
@@ -18,7 +18,10 @@
 
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use iggy_common::{ClusterMetadata, ClusterNode, ClusterNodeRole, 
ClusterNodeStatus, IggyError};
+use crate::streaming::utils::address::extract_port;
+use iggy_common::{
+    ClusterMetadata, ClusterNode, ClusterNodeRole, ClusterNodeStatus, 
IggyError, TransportEndpoints,
+};
 use tracing::trace;
 
 impl IggyShard {
@@ -34,53 +37,99 @@ impl IggyShard {
 
         let cluster_name = self.config.cluster.name.clone();
         let cluster_id = self.config.cluster.id;
+        let current_node_name = self.config.cluster.node.current.name.clone();
 
-        // Cannot fail because we validated it in config
-        let transport = self.config.cluster.transport;
-
-        let own_node_id = self.config.cluster.node.id;
-
-        let nodes: Vec<ClusterNode> = self
-            .config
-            .cluster
-            .nodes
-            .iter()
-            .map(|node_config| {
-                let (role, status) = if node_config.id == own_node_id {
-                    (
-                        if self.is_follower {
-                            ClusterNodeRole::Follower
-                        } else {
-                            ClusterNodeRole::Leader
-                        },
-                        ClusterNodeStatus::Healthy,
-                    )
+        let mut nodes = Vec::new();
+
+        let current_ip = self.config.cluster.node.current.ip.clone();
+
+        // Get the actual bound ports from the shard's bound addresses
+        let current_endpoints = 
self.get_actual_bound_ports().unwrap_or_else(|| {
+            TransportEndpoints::new(
+                extract_port(&self.config.tcp.address),
+                extract_port(&self.config.quic.address),
+                extract_port(&self.config.http.address),
+                extract_port(&self.config.websocket.address),
+            )
+        });
+
+        nodes.push(ClusterNode {
+            name: current_node_name.clone(),
+            ip: current_ip,
+            endpoints: current_endpoints,
+            role: if self.is_follower {
+                ClusterNodeRole::Follower
+            } else {
+                ClusterNodeRole::Leader
+            },
+            status: ClusterNodeStatus::Healthy,
+        });
+
+        for other_node in &self.config.cluster.node.others {
+            let endpoints = TransportEndpoints::new(
+                other_node.ports.tcp,
+                other_node.ports.quic,
+                other_node.ports.http,
+                other_node.ports.websocket,
+            );
+
+            nodes.push(ClusterNode {
+                name: other_node.name.clone(),
+                ip: other_node.ip.clone(),
+                endpoints,
+                role: if self.is_follower {
+                    ClusterNodeRole::Leader
                 } else {
-                    (
-                        if self.is_follower {
-                            ClusterNodeRole::Leader
-                        } else {
-                            ClusterNodeRole::Follower
-                        },
-                        ClusterNodeStatus::Healthy,
-                    )
-                };
-
-                ClusterNode {
-                    id: node_config.id,
-                    name: node_config.name.clone(),
-                    address: node_config.address.clone(),
-                    role,
-                    status,
-                }
-            })
-            .collect();
+                    ClusterNodeRole::Follower
+                },
+                status: ClusterNodeStatus::Healthy,
+            });
+        }
 
         Ok(ClusterMetadata {
             name: cluster_name,
             id: cluster_id,
-            transport,
             nodes,
         })
     }
+
+    /// Get actual bound ports from the shard's bound addresses
+    /// This is needed when server binds to port 0 (OS-assigned port)
+    fn get_actual_bound_ports(&self) -> Option<TransportEndpoints> {
+        let tcp_port = self
+            .tcp_bound_address
+            .get()
+            .map(|addr| addr.port())
+            .unwrap_or_else(|| extract_port(&self.config.tcp.address));
+
+        let quic_port = self
+            .quic_bound_address
+            .get()
+            .map(|addr| addr.port())
+            .unwrap_or_else(|| extract_port(&self.config.quic.address));
+
+        let http_port = self
+            .http_bound_address
+            .get()
+            .map(|addr| addr.port())
+            .unwrap_or_else(|| extract_port(&self.config.http.address));
+
+        let websocket_port = self
+            .websocket_bound_address
+            .get()
+            .map(|addr| addr.port())
+            .unwrap_or_else(|| extract_port(&self.config.websocket.address));
+
+        trace!(
+            "Using actual bound ports - TCP: {}, QUIC: {}, HTTP: {}, 
WebSocket: {}",
+            tcp_port, quic_port, http_port, websocket_port
+        );
+
+        Some(TransportEndpoints::new(
+            tcp_port,
+            quic_port,
+            http_port,
+            websocket_port,
+        ))
+    }
 }
diff --git a/core/server/src/streaming/systems/cluster/mod.rs 
b/core/server/src/streaming/systems/cluster/mod.rs
index 51161280f..09e947ea5 100644
--- a/core/server/src/streaming/systems/cluster/mod.rs
+++ b/core/server/src/streaming/systems/cluster/mod.rs
@@ -18,11 +18,13 @@
 
 use crate::streaming::session::Session;
 use crate::streaming::systems::system::System;
+use crate::streaming::utils::address::{extract_ip, extract_port};
 use iggy_common::ClusterMetadata;
 use iggy_common::ClusterNode;
 use iggy_common::ClusterNodeRole;
 use iggy_common::ClusterNodeStatus;
 use iggy_common::IggyError;
+use iggy_common::TransportEndpoints;
 use tracing::trace;
 
 impl System {
@@ -38,35 +40,49 @@ impl System {
 
         let name = self.cluster_config.name.clone();
         let id = self.cluster_config.id;
-        let transport = self.cluster_config.transport;
+        let current_node_name = self.cluster_config.node.current.name.clone();
 
-        let nodes: Vec<ClusterNode> = self
-            .cluster_config
-            .nodes
-            .iter()
-            .map(|node_config| {
-                let role = if node_config.id == 1 {
-                    ClusterNodeRole::Leader
-                } else {
-                    ClusterNodeRole::Follower
-                };
+        // Build nodes list starting with current node
+        let mut nodes = Vec::new();
 
-                let status = ClusterNodeStatus::Healthy;
+        // Add current node with ports derived from transport configs
+        let current_ip = extract_ip(&self.tcp_config.address);
+        let current_endpoints = TransportEndpoints::new(
+            &current_ip,
+            extract_port(&self.tcp_config.address),
+            extract_port(&self.quic_config.address),
+            extract_port(&self.http_config.address),
+            extract_port(&self.websocket_config.address),
+        );
 
-                ClusterNode {
-                    id: node_config.id,
-                    name: node_config.name.clone(),
-                    address: node_config.address.clone(),
-                    role,
-                    status,
-                }
-            })
-            .collect();
+        nodes.push(ClusterNode {
+            name: current_node_name.clone(),
+            endpoints: current_endpoints,
+            role: ClusterNodeRole::Leader,  // Placeholder
+            status: ClusterNodeStatus::Healthy,
+        });
+
+        // Add other nodes from configuration
+        for other_node in &self.cluster_config.node.others {
+            let endpoints = TransportEndpoints::new(
+                &other_node.ip,
+                other_node.ports.tcp,
+                other_node.ports.quic,
+                other_node.ports.http,
+                other_node.ports.websocket,
+            );
+
+            nodes.push(ClusterNode {
+                name: other_node.name.clone(),
+                endpoints,
+                role: ClusterNodeRole::Follower,  // Placeholder
+                status: ClusterNodeStatus::Healthy,
+            });
+        }
 
         Ok(ClusterMetadata {
             name,
             id,
-            transport,
             nodes,
         })
     }
diff --git a/core/server/src/streaming/utils/address.rs 
b/core/server/src/streaming/utils/address.rs
new file mode 100644
index 000000000..3c51b8e7b
--- /dev/null
+++ b/core/server/src/streaming/utils/address.rs
@@ -0,0 +1,76 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/// Extracts IP from an address string like "127.0.0.1:8090" or "[::1]:8090"
+pub fn extract_ip(address: &str) -> String {
+    if let Some(colon_pos) = address.rfind(':') {
+        // Handle IPv6 addresses like [::1]:8090
+        if address.starts_with('[')
+            && let Some(bracket_pos) = address.rfind(']')
+        {
+            return address[1..bracket_pos].to_string();
+        }
+        // Handle IPv4 addresses like 127.0.0.1:8090
+        return address[..colon_pos].to_string();
+    }
+    address.to_string()
+}
+
+/// Extracts port from an address string like "127.0.0.1:8090"
+pub fn extract_port(address: &str) -> u16 {
+    if let Some(colon_pos) = address.rfind(':')
+        && let Ok(port) = address[colon_pos + 1..].parse::<u16>()
+    {
+        return port;
+    }
+    0
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_extract_ip_ipv4() {
+        assert_eq!(extract_ip("127.0.0.1:8090"), "127.0.0.1");
+        assert_eq!(extract_ip("192.168.1.100:3000"), "192.168.1.100");
+    }
+
+    #[test]
+    fn test_extract_ip_ipv6() {
+        assert_eq!(extract_ip("[::1]:8090"), "::1");
+        assert_eq!(extract_ip("[2001:db8::1]:443"), "2001:db8::1");
+    }
+
+    #[test]
+    fn test_extract_ip_no_port() {
+        assert_eq!(extract_ip("127.0.0.1"), "127.0.0.1");
+    }
+
+    #[test]
+    fn test_extract_port() {
+        assert_eq!(extract_port("127.0.0.1:8090"), 8090);
+        assert_eq!(extract_port("192.168.1.100:3000"), 3000);
+        assert_eq!(extract_port("[::1]:8090"), 8090);
+    }
+
+    #[test]
+    fn test_extract_port_no_port() {
+        assert_eq!(extract_port("127.0.0.1"), 0);
+    }
+}
diff --git a/core/server/src/streaming/utils/mod.rs 
b/core/server/src/streaming/utils/mod.rs
index 74f5b7cf8..07896474c 100644
--- a/core/server/src/streaming/utils/mod.rs
+++ b/core/server/src/streaming/utils/mod.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+pub mod address;
 pub mod crypto;
 pub mod file;
 pub mod hash;

Reply via email to