This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch cluster-metadata
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 1d83c1a2c8ff4c658b64d6d1885fc4acaaa739b3
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Nov 17 11:05:47 2025 +0100

    feat(sdk): implement leader-aware reconnect mechanism in Rust SDK
---
 bdd/docker-compose.yml                             | 136 ++++++-
 bdd/rust/Cargo.toml                                |  12 +-
 bdd/rust/tests/common/global_context.rs            |   5 -
 .../{global_context.rs => leader_context.rs}       |  37 +-
 bdd/rust/tests/common/mod.rs                       |   1 +
 .../tests/{steps/mod.rs => leader_redirection.rs}  |  16 +-
 bdd/rust/tests/steps/leader_redirection.rs         | 415 +++++++++++++++++++++
 bdd/rust/tests/steps/mod.rs                        |   1 +
 bdd/rust/tests/steps/server.rs                     |  26 +-
 bdd/scenarios/leader_redirection.feature           |  61 +++
 core/common/src/types/cluster/status.rs            |   2 +
 core/configs/server.toml                           |   8 +-
 core/sdk/src/client_wrappers/connection_info.rs    |  61 +++
 core/sdk/src/client_wrappers/mod.rs                |   1 +
 core/sdk/src/clients/binary_cluster.rs             |   5 +-
 core/sdk/src/clients/binary_users.rs               |  72 +++-
 core/sdk/src/clients/client.rs                     |  17 +-
 core/sdk/src/leader_aware.rs                       | 217 +++++++++++
 core/sdk/src/lib.rs                                |   1 +
 core/sdk/src/prelude.rs                            |  27 +-
 core/sdk/src/quic/quic_client.rs                   | 313 ++++++++++------
 core/sdk/src/tcp/tcp_client.rs                     | 410 +++++++++++---------
 core/server/src/args.rs                            |  17 +
 core/server/src/main.rs                            |   6 +
 core/server/src/quic/listener.rs                   |  38 +-
 core/server/src/shard/builder.rs                   |   7 +
 core/server/src/shard/mod.rs                       |   1 +
 core/server/src/shard/system/cluster.rs            |  40 +-
 core/server/src/tcp/connection_handler.rs          |  33 +-
 29 files changed, 1565 insertions(+), 421 deletions(-)

diff --git a/bdd/docker-compose.yml b/bdd/docker-compose.yml
index 86bbc9fa9..a923fc578 100644
--- a/bdd/docker-compose.yml
+++ b/bdd/docker-compose.yml
@@ -16,6 +16,7 @@
 # under the License.
 
 services:
+  # Original single server for backward compatibility
   iggy-server:
     platform: linux/amd64
     build:
@@ -37,36 +38,159 @@ services:
         soft: -1
         hard: -1
     healthcheck:
-      test: ["CMD", "/usr/local/bin/iggy", "ping"]
-      interval: 1s
+      test: ["CMD", "/usr/local/bin/iggy", "--tcp-server-address", 
"127.0.0.1:8090", "ping"]
+      interval: 20s
       timeout: 3s
       retries: 30
       start_period: 2s
     environment:
-      - IGGY_ROOT_USERNAME=iggy
-      - IGGY_ROOT_PASSWORD=iggy
       - RUST_LOG=info
       - IGGY_SYSTEM_PATH=local_data
       - IGGY_TCP_ADDRESS=0.0.0.0:8090
       - IGGY_HTTP_ADDRESS=0.0.0.0:3000
       - IGGY_QUIC_ADDRESS=0.0.0.0:8080
+      - IGGY_WEBSOCKET_ADDRESS=0.0.0.0:8070
     volumes:
       - iggy_data:/app/local_data
     networks:
       - iggy-bdd-network
 
+  # Leader server for cluster testing
+  iggy-leader:
+    platform: linux/amd64
+    build:
+      context: ..
+      dockerfile: core/server/Dockerfile
+      target: runtime-prebuilt
+      args:
+        PREBUILT_IGGY_SERVER: ${IGGY_SERVER_PATH:-target/debug/iggy-server}
+        PREBUILT_IGGY_CLI: ${IGGY_CLI_PATH:-target/debug/iggy}
+        LIBC: glibc
+        PROFILE: debug
+    # Leader runs without --follower flag
+    command: ["--fresh", "--with-default-root-credentials"]
+    cap_add:
+      - SYS_NICE
+    security_opt:
+      - seccomp:unconfined
+    ulimits:
+      memlock:
+        soft: -1
+        hard: -1
+    healthcheck:
+      test: ["CMD", "/usr/local/bin/iggy", "--tcp-server-address", 
"127.0.0.1:8091", "ping"]
+      interval: 20s
+      timeout: 3s
+      retries: 30
+      start_period: 2s
+    environment:
+      - RUST_LOG=info
+      - IGGY_SYSTEM_PATH=local_data_leader
+      # Server addresses
+      - IGGY_TCP_ADDRESS=0.0.0.0:8091
+      - IGGY_HTTP_ADDRESS=0.0.0.0:3001
+      - IGGY_QUIC_ADDRESS=0.0.0.0:8081
+      - IGGY_WEBSOCKET_ADDRESS=0.0.0.0:8071
+      # Cluster configuration
+      - IGGY_CLUSTER_ENABLED=true
+      - IGGY_CLUSTER_NAME=test-cluster
+      - IGGY_CLUSTER_ID=1
+      - IGGY_CLUSTER_TRANSPORT=tcp
+      # This node's identity
+      - IGGY_CLUSTER_NODE_ID=0
+      - IGGY_CLUSTER_NODE_NAME=leader-node
+      - IGGY_CLUSTER_NODE_ADDRESS=iggy-leader:8091
+      # Cluster nodes configuration (indexed format for array)
+      - IGGY_CLUSTER_NODES_0_ID=0
+      - IGGY_CLUSTER_NODES_0_NAME=leader-node
+      - IGGY_CLUSTER_NODES_0_ADDRESS=iggy-leader:8091
+      - IGGY_CLUSTER_NODES_1_ID=1
+      - IGGY_CLUSTER_NODES_1_NAME=follower-node
+      - IGGY_CLUSTER_NODES_1_ADDRESS=iggy-follower:8092
+    volumes:
+      - iggy_leader_data:/app/local_data_leader
+    networks:
+      - iggy-bdd-network
+
+  # Follower server for cluster testing
+  iggy-follower:
+    platform: linux/amd64
+    build:
+      context: ..
+      dockerfile: core/server/Dockerfile
+      target: runtime-prebuilt
+      args:
+        PREBUILT_IGGY_SERVER: ${IGGY_SERVER_PATH:-target/debug/iggy-server}
+        PREBUILT_IGGY_CLI: ${IGGY_CLI_PATH:-target/debug/iggy}
+        LIBC: glibc
+        PROFILE: debug
+    # Follower runs with --follower flag
+    command: ["--fresh", "--with-default-root-credentials", "--follower"]
+    cap_add:
+      - SYS_NICE
+    security_opt:
+      - seccomp:unconfined
+    ulimits:
+      memlock:
+        soft: -1
+        hard: -1
+    healthcheck:
+      test: ["CMD", "/usr/local/bin/iggy", "--tcp-server-address", 
"127.0.0.1:8092", "ping"]
+      interval: 20s
+      timeout: 3s
+      retries: 30
+      start_period: 2s
+    environment:
+      - RUST_LOG=info
+      - IGGY_SYSTEM_PATH=local_data_follower
+      # Server addresses (different ports from leader)
+      - IGGY_TCP_ADDRESS=0.0.0.0:8092
+      - IGGY_HTTP_ADDRESS=0.0.0.0:3002
+      - IGGY_QUIC_ADDRESS=0.0.0.0:8082
+      - IGGY_WEBSOCKET_ADDRESS=0.0.0.0:8072
+      # Cluster configuration (same as leader)
+      - IGGY_CLUSTER_ENABLED=true
+      - IGGY_CLUSTER_NAME=test-cluster
+      - IGGY_CLUSTER_ID=1
+      - IGGY_CLUSTER_TRANSPORT=tcp
+      # This node's identity (different from leader)
+      - IGGY_CLUSTER_NODE_ID=1
+      - IGGY_CLUSTER_NODE_NAME=follower-node
+      - IGGY_CLUSTER_NODE_ADDRESS=iggy-follower:8092
+      # Cluster nodes configuration (indexed format for array)
+      - IGGY_CLUSTER_NODES_0_ID=0
+      - IGGY_CLUSTER_NODES_0_NAME=leader-node
+      - IGGY_CLUSTER_NODES_0_ADDRESS=iggy-leader:8091
+      - IGGY_CLUSTER_NODES_1_ID=1
+      - IGGY_CLUSTER_NODES_1_NAME=follower-node
+      - IGGY_CLUSTER_NODES_1_ADDRESS=iggy-follower:8092
+    volumes:
+      - iggy_follower_data:/app/local_data_follower
+    networks:
+      - iggy-bdd-network
+
   rust-bdd:
     build:
       context: ..
       dockerfile: bdd/rust/Dockerfile
     depends_on:
-      - iggy-server
+      iggy-server:
+        condition: service_healthy
+      iggy-leader:
+        condition: service_healthy
+      iggy-follower:
+        condition: service_healthy
     environment:
       - IGGY_ROOT_USERNAME=iggy
       - IGGY_ROOT_PASSWORD=iggy
       - IGGY_TCP_ADDRESS=iggy-server:8090
+      # Additional addresses for leader redirection tests
+      - IGGY_TCP_ADDRESS_LEADER=iggy-leader:8091
+      - IGGY_TCP_ADDRESS_FOLLOWER=iggy-follower:8092
+      - RUST_LOG=debug
     volumes:
       - 
./scenarios/basic_messaging.feature:/app/features/basic_messaging.feature
+      - 
./scenarios/leader_redirection.feature:/app/features/leader_redirection.feature
     command:
       [
         "cargo",
@@ -149,3 +273,5 @@ networks:
 
 volumes:
   iggy_data:
+  iggy_leader_data:
+  iggy_follower_data:
diff --git a/bdd/rust/Cargo.toml b/bdd/rust/Cargo.toml
index 18f2bf88e..ab8b019c0 100644
--- a/bdd/rust/Cargo.toml
+++ b/bdd/rust/Cargo.toml
@@ -41,11 +41,7 @@ name = "basic_messaging"
 harness = false
 required-features = ["bdd"]
 
-# Future test scenarios can be added here:
-# [[test]]
-# name = "user_management"
-# harness = false
-#
-# [[test]]
-# name = "consumer_groups"
-# harness = false
+[[test]]
+name = "leader_redirection"
+harness = false
+required-features = ["bdd"]
diff --git a/bdd/rust/tests/common/global_context.rs 
b/bdd/rust/tests/common/global_context.rs
index a526f873e..a425c6776 100644
--- a/bdd/rust/tests/common/global_context.rs
+++ b/bdd/rust/tests/common/global_context.rs
@@ -20,13 +20,8 @@ use cucumber::World;
 use iggy::clients::client::IggyClient;
 use iggy::prelude::{IggyMessage, PolledMessages};
 
-#[cfg(not(feature = "iggy-server-in-docker"))]
-use integration::test_server::TestServer;
-
 #[derive(Debug, World, Default)]
 pub struct GlobalContext {
-    #[cfg(not(feature = "iggy-server-in-docker"))]
-    pub server: Option<TestServer>,
     pub client: Option<IggyClient>,
     pub server_addr: Option<String>,
     pub last_stream_id: Option<u32>,
diff --git a/bdd/rust/tests/common/global_context.rs 
b/bdd/rust/tests/common/leader_context.rs
similarity index 58%
copy from bdd/rust/tests/common/global_context.rs
copy to bdd/rust/tests/common/leader_context.rs
index a526f873e..913866f11 100644
--- a/bdd/rust/tests/common/global_context.rs
+++ b/bdd/rust/tests/common/leader_context.rs
@@ -18,22 +18,31 @@
 
 use cucumber::World;
 use iggy::clients::client::IggyClient;
-use iggy::prelude::{IggyMessage, PolledMessages};
-
-#[cfg(not(feature = "iggy-server-in-docker"))]
-use integration::test_server::TestServer;
+use iggy::prelude::ClusterNode;
+use std::collections::HashMap;
 
 #[derive(Debug, World, Default)]
-pub struct GlobalContext {
-    #[cfg(not(feature = "iggy-server-in-docker"))]
-    pub server: Option<TestServer>,
-    pub client: Option<IggyClient>,
-    pub server_addr: Option<String>,
+pub struct LeaderContext {
+    // Server addresses by name (e.g., "leader" -> "iggy-leader:8091")
+    pub server_addrs: HashMap<String, String>,
+
+    // Multiple clients for testing
+    pub clients: HashMap<String, IggyClient>,
+
+    // Track which client connected to which server
+    pub client_connections: HashMap<String, String>,
+
+    // Track if redirection happened
+    pub redirection_occurred: bool,
+
+    // Cluster configuration
+    pub cluster_enabled: bool,
+    pub node_count: usize,
+
+    // Node configurations
+    pub nodes: Vec<ClusterNode>,
+
+    // Fields for compatibility with existing step definitions
     pub last_stream_id: Option<u32>,
     pub last_stream_name: Option<String>,
-    pub last_topic_id: Option<u32>,
-    pub last_topic_name: Option<String>,
-    pub last_topic_partitions: Option<u32>,
-    pub last_polled_messages: Option<PolledMessages>,
-    pub last_sent_message: Option<IggyMessage>,
 }
diff --git a/bdd/rust/tests/common/mod.rs b/bdd/rust/tests/common/mod.rs
index 0c28f3a53..16682d2a9 100644
--- a/bdd/rust/tests/common/mod.rs
+++ b/bdd/rust/tests/common/mod.rs
@@ -17,3 +17,4 @@
  */
 
 pub mod global_context;
+pub mod leader_context;
diff --git a/bdd/rust/tests/steps/mod.rs b/bdd/rust/tests/leader_redirection.rs
similarity index 75%
copy from bdd/rust/tests/steps/mod.rs
copy to bdd/rust/tests/leader_redirection.rs
index e9d00b626..08598565e 100644
--- a/bdd/rust/tests/steps/mod.rs
+++ b/bdd/rust/tests/leader_redirection.rs
@@ -16,8 +16,14 @@
  * under the License.
  */
 
-pub mod auth;
-pub mod messages;
-pub mod server;
-pub mod streams;
-pub mod topics;
+pub(crate) mod common;
+pub(crate) mod helpers;
+pub(crate) mod steps;
+
+use crate::common::leader_context::LeaderContext;
+use cucumber::World;
+
+#[tokio::main]
+async fn main() {
+    LeaderContext::run("../../bdd/scenarios/leader_redirection.feature").await;
+}
diff --git a/bdd/rust/tests/steps/leader_redirection.rs 
b/bdd/rust/tests/steps/leader_redirection.rs
new file mode 100644
index 000000000..0f7f7d72d
--- /dev/null
+++ b/bdd/rust/tests/steps/leader_redirection.rs
@@ -0,0 +1,415 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::common::leader_context::LeaderContext;
+use cucumber::{given, then, when};
+use iggy::prelude::*;
+use integration::tcp_client::TcpClientFactory;
+use integration::test_server::{ClientFactory, login_root};
+use std::time::Duration;
+use tokio::time::sleep;
+
+// Background steps for cluster configuration
+#[given(regex = r"^I have cluster configuration enabled with (\d+) nodes$")]
+async fn given_cluster_config(world: &mut LeaderContext, node_count: usize) {
+    world.cluster_enabled = true;
+    world.node_count = node_count;
+    world.nodes = Vec::with_capacity(node_count);
+}
+
+#[given(regex = r"^node (\d+) is configured on port (\d+)$")]
+async fn given_node_configured(world: &mut LeaderContext, node_id: u32, port: 
u16) {
+    let node = ClusterNode {
+        id: node_id,
+        name: format!("node-{}", node_id),
+        address: format!("iggy-server:{}", port),
+        role: ClusterNodeRole::Follower,
+        status: ClusterNodeStatus::Healthy,
+    };
+    world.nodes.push(node);
+}
+
+// Leader/follower server setup - in Docker mode, servers are already running
+#[given(regex = r"^I start server (\d+) on port (\d+) as leader$")]
+async fn given_start_leader_server(world: &mut LeaderContext, node_id: u32, 
port: u16) {
+    // In Docker mode, determine the address based on port
+    let addr = match port {
+        8091 => std::env::var("IGGY_TCP_ADDRESS_LEADER")
+            .unwrap_or_else(|_| "iggy-leader:8091".to_string()),
+        _ => format!("iggy-server:{}", port),
+    };
+    world.server_addrs.insert("leader".to_string(), addr);
+
+    if let Some(node) = world
+        .nodes
+        .iter_mut()
+        .find(|n| n.id == node_id && n.address.ends_with(&format!(":{}", 
port)))
+    {
+        node.role = ClusterNodeRole::Leader;
+    }
+}
+
+#[given(regex = r"^I start server (\d+) on port (\d+) as follower$")]
+async fn given_start_follower_server(world: &mut LeaderContext, node_id: u32, 
port: u16) {
+    // In Docker mode, determine the address based on port
+    let addr = match port {
+        8092 => std::env::var("IGGY_TCP_ADDRESS_FOLLOWER")
+            .unwrap_or_else(|_| "iggy-follower:8092".to_string()),
+        _ => format!("iggy-server:{}", port),
+    };
+    world.server_addrs.insert("follower".to_string(), addr);
+
+    if let Some(node) = world
+        .nodes
+        .iter_mut()
+        .find(|n| n.id == node_id && n.address.ends_with(&format!(":{}", 
port)))
+    {
+        node.role = ClusterNodeRole::Follower;
+    }
+}
+
+#[given(regex = r"^I start a single server on port (\d+) without clustering 
enabled$")]
+async fn given_single_server_no_cluster(world: &mut LeaderContext, port: u16) {
+    // In Docker mode, use the main iggy-server
+    let addr = match port {
+        8090 => {
+            std::env::var("IGGY_TCP_ADDRESS").unwrap_or_else(|_| 
"iggy-server:8090".to_string())
+        }
+        _ => format!("iggy-server:{}", port),
+    };
+    world.server_addrs.insert("single".to_string(), addr);
+    world.cluster_enabled = false;
+}
+
+// Client connection steps
+#[when(regex = r"^I create a client connecting to follower on port (\d+)$")]
+async fn when_connect_to_follower(world: &mut LeaderContext, port: u16) {
+    // Find the node with this port that is marked as follower
+    let node = world.nodes.iter().find(|n| {
+        n.address.ends_with(&format!(":{}", port)) && matches!(n.role, 
ClusterNodeRole::Follower)
+    });
+    assert!(
+        node.is_some(),
+        "Follower node on port {} should be configured",
+        port
+    );
+
+    let addr = world
+        .server_addrs
+        .get("follower")
+        .expect("Follower server should be configured")
+        .clone();
+    create_client_for_addr(world, "main", &addr).await;
+}
+
+#[when(regex = r"^I create a client connecting directly to leader on port 
(\d+)$")]
+async fn when_connect_to_leader(world: &mut LeaderContext, port: u16) {
+    // Find the node with this port that is marked as leader
+    let node = world.nodes.iter().find(|n| {
+        n.address.ends_with(&format!(":{}", port)) && matches!(n.role, 
ClusterNodeRole::Leader)
+    });
+    assert!(
+        node.is_some(),
+        "Leader node on port {} should be configured",
+        port
+    );
+
+    let addr = world
+        .server_addrs
+        .get("leader")
+        .expect("Leader server should be configured")
+        .clone();
+    create_client_for_addr(world, "main", &addr).await;
+    world.redirection_occurred = false;
+}
+
+#[when(regex = r"^I create a client connecting to port (\d+)$")]
+async fn when_connect_to_port(world: &mut LeaderContext, port: u16) {
+    assert_eq!(port, 8090, "Single server should be on port 8090");
+    let addr = world
+        .server_addrs
+        .get("single")
+        .expect("Single server should be configured")
+        .clone();
+    create_client_for_addr(world, "main", &addr).await;
+}
+
+#[when(regex = r"^I create client ([A-Z]) connecting to port (\d+)$")]
+async fn when_create_named_client(world: &mut LeaderContext, client_name: 
String, port: u16) {
+    // Determine which server based on port
+    let addr = if port == 8091 {
+        world.server_addrs.get("leader").cloned()
+    } else {
+        world.server_addrs.get("follower").cloned()
+    }
+    .expect("Server should be configured");
+    create_client_for_addr(world, &client_name, &addr).await;
+}
+
+// Authentication steps
+#[when("I authenticate as root user")]
+async fn when_authenticate_root(world: &mut LeaderContext) {
+    let client = world.clients.get("main").expect("Client should be created");
+
+    login_root(client).await;
+}
+
+#[when("both clients authenticate as root user")]
+async fn when_both_clients_authenticate(world: &mut LeaderContext) {
+    for (_, client) in world.clients.iter() {
+        login_root(client).await;
+        sleep(Duration::from_millis(100)).await;
+    }
+}
+
+// Stream creation step for leader context
+#[when(regex = r#"^I create a stream named "(.+)"$"#)]
+async fn when_create_stream_in_cluster(world: &mut LeaderContext, stream_name: 
String) {
+    let client = world
+        .clients
+        .get("main")
+        .expect("Client should be available");
+    let stream = client
+        .create_stream(&stream_name)
+        .await
+        .expect("Should be able to create stream");
+
+    world.last_stream_id = Some(stream.id);
+    world.last_stream_name = Some(stream.name.clone());
+}
+
+#[then("the stream should be created successfully on the leader")]
+async fn then_stream_created_on_leader(world: &mut LeaderContext) {
+    assert!(
+        world.last_stream_id.is_some(),
+        "Stream should have been created on leader"
+    );
+}
+
+// Verification steps
+#[then(regex = r"^the client should automatically redirect to leader on port 
(\d+)$")]
+async fn then_client_redirected(world: &mut LeaderContext, leader_port: u16) {
+    let client = world.clients.get("main").expect("Client should exist");
+
+    // Get the actual connection info to verify we're connected to the leader
+    let connection_info = client.get_connection_info().await;
+
+    // Check if the connection address matches the leader port
+    assert!(
+        connection_info
+            .server_address
+            .contains(&format!(":{}", leader_port)),
+        "Client should be connected to leader on port {}, but is connected to: 
{}",
+        leader_port,
+        connection_info.server_address
+    );
+
+    // Also verify through cluster metadata
+    if let Ok(metadata) = client.get_cluster_metadata().await {
+        let leader_node = metadata
+            .nodes
+            .iter()
+            .find(|n| matches!(n.role, ClusterNodeRole::Leader));
+
+        assert!(
+            leader_node.is_some(),
+            "Leader node should be found in cluster metadata"
+        );
+    }
+
+    world.redirection_occurred = true;
+}
+
+#[then("the client should not perform any redirection")]
+async fn then_no_redirection(world: &mut LeaderContext) {
+    assert!(
+        !world.redirection_occurred,
+        "No redirection should occur when connecting directly to leader"
+    );
+}
+
+#[then(regex = r"^the connection should remain on port (\d+)$")]
+async fn then_connection_remains(world: &mut LeaderContext, port: u16) {
+    let client = world.clients.get("main").expect("Client should exist");
+
+    // Get the actual connection info
+    let connection_info = client.get_connection_info().await;
+
+    // Verify the client is still connected to the original port
+    assert!(
+        connection_info
+            .server_address
+            .contains(&format!(":{}", port)),
+        "Connection should remain on port {}, but is connected to: {}",
+        port,
+        connection_info.server_address
+    );
+
+    // Verify no redirection occurred
+    assert!(
+        !world.redirection_occurred,
+        "Connection should not have been redirected"
+    );
+}
+
+#[then("the client should connect successfully without redirection")]
+async fn then_connect_without_redirection(world: &mut LeaderContext) {
+    let client = world.clients.get("main").expect("Client should exist");
+
+    // Verify client is connected and can ping
+    let ping_result = client.ping().await;
+    assert!(ping_result.is_ok(), "Client should be able to ping server");
+    assert!(
+        !world.redirection_occurred,
+        "No redirection should occur without clustering"
+    );
+}
+
+#[then(regex = r"^client ([A-Z]) should stay connected to port (\d+)$")]
+async fn then_client_stays_connected(world: &mut LeaderContext, client_name: 
String, port: u16) {
+    let client = world
+        .clients
+        .get(&client_name)
+        .expect("Client should exist");
+
+    // Get the actual connection info
+    let connection_info = client.get_connection_info().await;
+
+    // Verify the client is still connected to the expected port
+    assert!(
+        connection_info
+            .server_address
+            .contains(&format!(":{}", port)),
+        "Client {} should stay connected to port {}, but is connected to: {}",
+        client_name,
+        port,
+        connection_info.server_address
+    );
+
+    let ping_result = client.ping().await;
+    assert!(
+        ping_result.is_ok(),
+        "Client {} should be able to ping",
+        client_name
+    );
+}
+
+#[then(regex = r"^client ([A-Z]) should redirect to port (\d+)$")]
+async fn then_client_redirects(world: &mut LeaderContext, client_name: String, 
leader_port: u16) {
+    let client = world
+        .clients
+        .get(&client_name)
+        .expect("Client should exist");
+
+    // Get the actual connection info to verify redirection
+    let connection_info = client.get_connection_info().await;
+
+    // Verify the client is now connected to the leader port
+    assert!(
+        connection_info
+            .server_address
+            .contains(&format!(":{}", leader_port)),
+        "Client {} should redirect to port {}, but is connected to: {}",
+        client_name,
+        leader_port,
+        connection_info.server_address
+    );
+
+    // After authentication, the SDK should have redirected to leader
+    if let Ok(metadata) = client.get_cluster_metadata().await {
+        let leader = metadata
+            .nodes
+            .iter()
+            .find(|n| matches!(n.role, ClusterNodeRole::Leader));
+
+        assert!(
+            leader.is_some(),
+            "Client {} should find leader in cluster metadata",
+            client_name
+        );
+    }
+}
+
+#[then("both clients should be using the same server")]
+async fn then_both_use_same_server(world: &mut LeaderContext) {
+    let client_a = world.clients.get("A").expect("Client A should exist");
+    let client_b = world.clients.get("B").expect("Client B should exist");
+
+    // Get connection info for both clients
+    let conn_info_a = client_a.get_connection_info().await;
+    let conn_info_b = client_b.get_connection_info().await;
+
+    // Verify both clients are connected to the same server
+    assert_eq!(
+        conn_info_a.server_address, conn_info_b.server_address,
+        "Both clients should be connected to the same server. Client A: {}, 
Client B: {}",
+        conn_info_a.server_address, conn_info_b.server_address
+    );
+
+    // Both clients should be able to ping the server
+    assert!(
+        client_a.ping().await.is_ok(),
+        "Client A should be able to ping"
+    );
+    assert!(
+        client_b.ping().await.is_ok(),
+        "Client B should be able to ping"
+    );
+
+    // If cluster metadata is available, verify both see the same leader
+    if let (Ok(metadata_a), Ok(metadata_b)) = (
+        client_a.get_cluster_metadata().await,
+        client_b.get_cluster_metadata().await,
+    ) {
+        let leader_a = metadata_a
+            .nodes
+            .iter()
+            .find(|n| matches!(n.role, ClusterNodeRole::Leader));
+
+        let leader_b = metadata_b
+            .nodes
+            .iter()
+            .find(|n| matches!(n.role, ClusterNodeRole::Leader));
+
+        if let (Some(leader_a), Some(leader_b)) = (leader_a, leader_b) {
+            assert_eq!(
+                leader_a.address, leader_b.address,
+                "Both clients should see the same leader"
+            );
+        }
+    }
+}
+
+// Helper function to create a client for a given address
+async fn create_client_for_addr(world: &mut LeaderContext, client_name: &str, 
addr: &str) {
+    let client_factory = TcpClientFactory {
+        server_addr: addr.to_string(),
+        ..Default::default()
+    };
+
+    let client = client_factory.create_client().await;
+    let client = IggyClient::create(client, None, None);
+
+    // Connect the client before authentication
+    client.connect().await.expect("Client should connect");
+
+    world
+        .client_connections
+        .insert(client_name.to_string(), addr.to_string());
+    world.clients.insert(client_name.to_string(), client);
+}
diff --git a/bdd/rust/tests/steps/mod.rs b/bdd/rust/tests/steps/mod.rs
index e9d00b626..45c8bde63 100644
--- a/bdd/rust/tests/steps/mod.rs
+++ b/bdd/rust/tests/steps/mod.rs
@@ -17,6 +17,7 @@
  */
 
 pub mod auth;
+pub mod leader_redirection;
 pub mod messages;
 pub mod server;
 pub mod streams;
diff --git a/bdd/rust/tests/steps/server.rs b/bdd/rust/tests/steps/server.rs
index ad688f73b..a4027875e 100644
--- a/bdd/rust/tests/steps/server.rs
+++ b/bdd/rust/tests/steps/server.rs
@@ -19,28 +19,10 @@
 use crate::common::global_context::GlobalContext;
 use cucumber::given;
 
-#[cfg(not(feature = "iggy-server-in-docker"))]
-use integration::test_server::TestServer;
-
 #[given("I have a running Iggy server")]
 pub async fn given_running_server(world: &mut GlobalContext) {
-    #[cfg(feature = "iggy-server-in-docker")]
-    {
-        // External server mode - connect to server from environment
-        let server_addr =
-            std::env::var("IGGY_TCP_ADDRESS").unwrap_or_else(|_| 
"localhost:8090".to_string());
-        world.server_addr = Some(server_addr);
-        // No TestServer instance in external mode
-    }
-
-    #[cfg(not(feature = "iggy-server-in-docker"))]
-    {
-        // Embedded server mode - start our own TestServer
-        let mut test_server = TestServer::default();
-        test_server.start();
-        let server_addr = test_server.get_raw_tcp_addr().unwrap();
-
-        world.server_addr = Some(server_addr);
-        world.server = Some(test_server);
-    }
+    // External server mode - connect to server from environment
+    let server_addr =
+        std::env::var("IGGY_TCP_ADDRESS").unwrap_or_else(|_| 
"localhost:8090".to_string());
+    world.server_addr = Some(server_addr);
 }
diff --git a/bdd/scenarios/leader_redirection.feature 
b/bdd/scenarios/leader_redirection.feature
new file mode 100644
index 000000000..0f6c76701
--- /dev/null
+++ b/bdd/scenarios/leader_redirection.feature
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+Feature: Leader-Aware Client Connections
+  As a developer using Apache Iggy with clustering
+  I want my SDK clients to automatically connect to the leader node
+  So that I can always interact with the correct server instance
+
+  Background:
+    Given I have cluster configuration enabled with 2 nodes
+    And node 0 is configured on port 8091
+    And node 1 is configured on port 8092
+
+  Scenario: Client redirects from follower to leader
+    Given I start server 0 on port 8091 as leader
+    And I start server 1 on port 8092 as follower
+    When I create a client connecting to follower on port 8092
+    And I authenticate as root user
+    Then the client should automatically redirect to leader on port 8091
+    When I create a stream named "test-stream"
+    Then the stream should be created successfully on the leader
+
+  Scenario: Client connects directly to leader without redirection
+    Given I start server 0 on port 8091 as leader
+    And I start server 1 on port 8092 as follower
+    When I create a client connecting directly to leader on port 8091
+    And I authenticate as root user
+    Then the client should not perform any redirection
+    And the connection should remain on port 8091
+
+  Scenario: Client handles missing cluster metadata gracefully
+    Given I start a single server on port 8090 without clustering enabled
+    When I create a client connecting to port 8090
+    And I authenticate as root user
+    Then the client should connect successfully without redirection
+    When I create a stream named "single-server-stream"
+    Then the stream should be created successfully on the leader
+
+  Scenario: Multiple clients converge to the same leader
+    Given I start server 0 on port 8091 as leader
+    And I start server 1 on port 8092 as follower
+    When I create client A connecting to port 8091
+    And I create client B connecting to port 8092
+    And both clients authenticate as root user
+    Then client A should stay connected to port 8091
+    And client B should redirect to port 8091
+    And both clients should be using the same server
\ No newline at end of file
diff --git a/core/common/src/types/cluster/status.rs 
b/core/common/src/types/cluster/status.rs
index e60932f1a..ddb8c07d3 100644
--- a/core/common/src/types/cluster/status.rs
+++ b/core/common/src/types/cluster/status.rs
@@ -38,6 +38,8 @@ pub enum ClusterNodeStatus {
     Unreachable,
     /// Node is in maintenance mode
     Maintenance,
+    /// Node is unknown
+    Unknown,
 }
 
 impl TryFrom<u8> for ClusterNodeStatus {
diff --git a/core/configs/server.toml b/core/configs/server.toml
index db689b3ea..42f01d555 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -511,7 +511,7 @@ enabled = false
 
 # Unique cluster identifier (u32).
 # All nodes in the same cluster must share the same cluster id.
-id = 1
+id = 0
 
 # Unique cluster name (string).
 # All nodes in the same cluster must share the same name.
@@ -526,16 +526,16 @@ transport = "tcp"
 [cluster.node]
 # This node unique identifier within the cluster (u32).
 # Must be unique across all nodes and match one of the ids in the nodes list.
-id = 1
+id = 0
 
 # All nodes in the cluster.
 [[cluster.nodes]]
-id = 1
+id = 0
 name = "iggy-node-1"
 address = "127.0.0.1:8090"
 
 [[cluster.nodes]]
-id = 2
+id = 1
 name = "iggy-node-2"
 address = "127.0.0.1:8091"
 
diff --git a/core/sdk/src/client_wrappers/connection_info.rs 
b/core/sdk/src/client_wrappers/connection_info.rs
new file mode 100644
index 000000000..2d6d52d4a
--- /dev/null
+++ b/core/sdk/src/client_wrappers/connection_info.rs
@@ -0,0 +1,61 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::client_wrappers::client_wrapper::ClientWrapper;
+use iggy_common::TransportProtocol;
+
+/// Connection information for the current client connection
+#[derive(Debug, Clone)]
+pub struct ConnectionInfo {
+    /// The transport protocol being used
+    pub protocol: TransportProtocol,
+    /// The current server address the client is connected to
+    pub server_address: String,
+}
+
+impl ClientWrapper {
+    /// Returns the current connection information including protocol and 
server address
+    pub async fn get_connection_info(&self) -> ConnectionInfo {
+        match self {
+            ClientWrapper::Iggy(_) => {
+                // This variant should not be used in practice as IggyClient 
always wraps
+                // one of the concrete transport clients. Return a 
default/placeholder.
+                ConnectionInfo {
+                    protocol: TransportProtocol::Tcp,
+                    server_address: String::from("unknown"),
+                }
+            }
+            ClientWrapper::Tcp(client) => ConnectionInfo {
+                protocol: TransportProtocol::Tcp,
+                server_address: 
client.current_server_address.lock().await.clone(),
+            },
+            ClientWrapper::Quic(client) => ConnectionInfo {
+                protocol: TransportProtocol::Quic,
+                server_address: 
client.current_server_address.lock().await.to_string(),
+            },
+            ClientWrapper::Http(client) => ConnectionInfo {
+                protocol: TransportProtocol::Http,
+                server_address: client.api_url.to_string(),
+            },
+            ClientWrapper::WebSocket(client) => ConnectionInfo {
+                protocol: TransportProtocol::WebSocket,
+                server_address: client.config.server_address.clone(),
+            },
+        }
+    }
+}
diff --git a/core/sdk/src/client_wrappers/mod.rs 
b/core/sdk/src/client_wrappers/mod.rs
index bd093e01b..2cf980606 100644
--- a/core/sdk/src/client_wrappers/mod.rs
+++ b/core/sdk/src/client_wrappers/mod.rs
@@ -29,3 +29,4 @@ mod binary_system_client;
 mod binary_topic_client;
 mod binary_user_client;
 pub mod client_wrapper;
+pub mod connection_info;
diff --git a/core/sdk/src/clients/binary_cluster.rs 
b/core/sdk/src/clients/binary_cluster.rs
index 18ffc6f9b..6ab7ead4a 100644
--- a/core/sdk/src/clients/binary_cluster.rs
+++ b/core/sdk/src/clients/binary_cluster.rs
@@ -19,14 +19,11 @@
 use crate::prelude::IggyClient;
 use async_trait::async_trait;
 use iggy_binary_protocol::ClusterClient;
-use iggy_common::{ClusterMetadata, IggyError};
+use iggy_common::{ClusterMetadata, IggyError, locking::IggyRwLockFn};
 
 #[async_trait]
 impl ClusterClient for IggyClient {
     async fn get_cluster_metadata(&self) -> Result<ClusterMetadata, IggyError> 
{
-        todo!();
-        /*
         self.client.read().await.get_cluster_metadata().await
-        */
     }
 }
diff --git a/core/sdk/src/clients/binary_users.rs 
b/core/sdk/src/clients/binary_users.rs
index c13ab9288..9390a6021 100644
--- a/core/sdk/src/clients/binary_users.rs
+++ b/core/sdk/src/clients/binary_users.rs
@@ -16,13 +16,16 @@
  * under the License.
  */
 
+use crate::client_wrappers::client_wrapper::ClientWrapper;
+use crate::leader_aware::check_and_redirect_to_leader;
 use crate::prelude::IggyClient;
 use async_trait::async_trait;
-use iggy_binary_protocol::UserClient;
+use iggy_binary_protocol::{Client, UserClient};
 use iggy_common::locking::IggyRwLockFn;
 use iggy_common::{
     Identifier, IdentityInfo, IggyError, Permissions, UserInfo, 
UserInfoDetails, UserStatus,
 };
+use tracing::info;
 
 #[async_trait]
 impl UserClient for IggyClient {
@@ -91,11 +94,74 @@ impl UserClient for IggyClient {
     }
 
     async fn login_user(&self, username: &str, password: &str) -> 
Result<IdentityInfo, IggyError> {
-        self.client
+        // Perform the initial login
+        let identity = self
+            .client
             .read()
             .await
             .login_user(username, password)
-            .await
+            .await?;
+
+        // Check if we need to redirect to leader (only for TCP and QUIC 
clients)
+        let new_leader_addr = {
+            let client = self.client.read().await;
+            match &*client {
+                ClientWrapper::Tcp(tcp_client) => {
+                    let current_addr = 
tcp_client.current_server_address.lock().await.clone();
+                    check_and_redirect_to_leader(tcp_client, 
&current_addr).await?
+                }
+                ClientWrapper::Quic(quic_client) => {
+                    let current_addr = 
quic_client.current_server_address.lock().await.to_string();
+                    check_and_redirect_to_leader(quic_client, 
&current_addr).await?
+                }
+                _ => None, // HTTP and WebSocket don't support leader 
redirection
+            }
+        };
+
+        // If redirection is needed, disconnect, update address, and reconnect
+        if let Some(leader_addr) = new_leader_addr {
+            info!(
+                "Redirecting to leader at {} after manual login",
+                leader_addr
+            );
+
+            // Disconnect from current server
+            self.client.read().await.disconnect().await?;
+
+            // Update the server address and reconnect
+            {
+                let client = self.client.read().await;
+                match &*client {
+                    ClientWrapper::Tcp(tcp_client) => {
+                        *tcp_client.current_server_address.lock().await = 
leader_addr.clone();
+                    }
+                    ClientWrapper::Quic(quic_client) => {
+                        use std::net::SocketAddr;
+                        use std::str::FromStr;
+
+                        if let Ok(socket_addr) = 
SocketAddr::from_str(&leader_addr) {
+                            *quic_client.current_server_address.lock().await = 
socket_addr;
+                        }
+                    }
+                    _ => {}
+                }
+            }
+
+            // Reconnect to the leader
+            self.client.read().await.connect().await?;
+
+            // Re-authenticate with the leader
+            let identity = self
+                .client
+                .read()
+                .await
+                .login_user(username, password)
+                .await?;
+
+            Ok(identity)
+        } else {
+            Ok(identity)
+        }
     }
 
     async fn logout_user(&self) -> Result<(), IggyError> {
diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs
index 15b39db2b..b2dd19689 100644
--- a/core/sdk/src/clients/client.rs
+++ b/core/sdk/src/clients/client.rs
@@ -16,11 +16,9 @@
  * under the License.
  */
 
-use crate::clients::client_builder::IggyClientBuilder;
-use iggy_common::Consumer;
-use iggy_common::locking::{IggyRwLock, IggyRwLockFn};
-
 use crate::client_wrappers::client_wrapper::ClientWrapper;
+use crate::client_wrappers::connection_info::ConnectionInfo;
+use crate::clients::client_builder::IggyClientBuilder;
 use crate::http::http_client::HttpClient;
 use crate::prelude::EncryptorKind;
 use crate::prelude::IggyConsumerBuilder;
@@ -32,6 +30,8 @@ use crate::websocket::websocket_client::WebSocketClient;
 use async_broadcast::Receiver;
 use async_trait::async_trait;
 use iggy_binary_protocol::{Client, SystemClient};
+use iggy_common::Consumer;
+use iggy_common::locking::{IggyRwLock, IggyRwLockFn};
 use iggy_common::{ConnectionStringUtils, DiagnosticEvent, Partitioner, 
TransportProtocol};
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -63,7 +63,6 @@ impl IggyClient {
         IggyClientBuilder::new()
     }
 
-    /// Creates a new `IggyClientBuilder` from the provided connection string.
     /// Creates a new `IggyClientBuilder` from the provided connection string.
     pub fn builder_from_connection_string(
         connection_string: &str,
@@ -81,7 +80,6 @@ impl IggyClient {
         }
     }
 
-    /// Creates a new `IggyClient` from the provided connection string.
     /// Creates a new `IggyClient` from the provided connection string.
     pub fn from_connection_string(connection_string: &str) -> Result<Self, 
IggyError> {
         match ConnectionStringUtils::parse_protocol(connection_string)? {
@@ -177,6 +175,13 @@ impl IggyClient {
             None,
         ))
     }
+
+    /// Returns the current connection information including the transport 
protocol and server address.
+    /// This is useful for verifying which server the client is connected to, 
especially after
+    /// leader redirection in a clustered environment.
+    pub async fn get_connection_info(&self) -> ConnectionInfo {
+        self.client.read().await.get_connection_info().await
+    }
 }
 
 #[async_trait]
diff --git a/core/sdk/src/leader_aware.rs b/core/sdk/src/leader_aware.rs
new file mode 100644
index 000000000..0a17b33b9
--- /dev/null
+++ b/core/sdk/src/leader_aware.rs
@@ -0,0 +1,217 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use iggy_binary_protocol::ClusterClient;
+use iggy_common::{
+    ClusterMetadata, ClusterNodeRole, ClusterNodeStatus, IggyError, 
IggyErrorDiscriminants,
+};
+use std::net::SocketAddr;
+use std::str::FromStr;
+use tracing::{debug, info, warn};
+
+/// Maximum number of leader redirections to prevent infinite loops
+const MAX_LEADER_REDIRECTS: u8 = 3;
+
+/// Check if we need to redirect to leader and return the leader address if 
redirection is needed
+pub async fn check_and_redirect_to_leader<C: ClusterClient>(
+    client: &C,
+    current_address: &str,
+) -> Result<Option<String>, IggyError> {
+    debug!("Checking cluster metadata for leader detection");
+
+    // Try to get cluster metadata
+    match client.get_cluster_metadata().await {
+        Ok(metadata) => {
+            debug!(
+                "Got cluster metadata: {} nodes, cluster: {}",
+                metadata.nodes.len(),
+                metadata.name
+            );
+            process_cluster_metadata(&metadata, current_address)
+        }
+        Err(e) if is_feature_unavailable_error(&e) => {
+            debug!("Cluster metadata feature unavailable - server doesn't 
support clustering");
+            Ok(None)
+        }
+        Err(e) => {
+            warn!("Failed to get cluster metadata: {}", e);
+            // Don't fail the connection, just continue with current node
+            Ok(None)
+        }
+    }
+}
+
+/// Process cluster metadata and determine if redirection is needed
+fn process_cluster_metadata(
+    metadata: &ClusterMetadata,
+    current_address: &str,
+) -> Result<Option<String>, IggyError> {
+    // Find active leader node
+    let leader = metadata
+        .nodes
+        .iter()
+        .find(|n| n.role == ClusterNodeRole::Leader && n.status == 
ClusterNodeStatus::Healthy);
+
+    match leader {
+        Some(leader_node) => {
+            info!(
+                "Found leader node: {} at {}",
+                leader_node.name, leader_node.address
+            );
+
+            // Check if we're already connected to the leader
+            if !is_same_address(current_address, &leader_node.address) {
+                info!(
+                    "Current connection to {} is not the leader, will redirect 
to {}",
+                    current_address, leader_node.address
+                );
+                Ok(Some(leader_node.address.clone()))
+            } else {
+                debug!("Already connected to leader at {}", current_address);
+                Ok(None)
+            }
+        }
+        None => {
+            warn!("No active leader found in cluster metadata");
+            // Continue with current connection if no leader is found
+            Ok(None)
+        }
+    }
+}
+
+/// Check if two addresses refer to the same endpoint
+/// Handles various formats like 127.0.0.1:8090 vs localhost:8090
+fn is_same_address(addr1: &str, addr2: &str) -> bool {
+    // Try to parse as socket addresses for comparison
+    match (parse_address(addr1), parse_address(addr2)) {
+        (Some(sock1), Some(sock2)) => {
+            // Compare IP and port
+            sock1.ip() == sock2.ip() && sock1.port() == sock2.port()
+        }
+        _ => {
+            // Fallback to string comparison if parsing fails
+            normalize_address(addr1) == normalize_address(addr2)
+        }
+    }
+}
+
+/// Parse address string to SocketAddr, handling various formats
+fn parse_address(addr: &str) -> Option<SocketAddr> {
+    // Try direct parsing
+    if let Ok(socket_addr) = SocketAddr::from_str(addr) {
+        return Some(socket_addr);
+    }
+
+    // Try with common replacements
+    let normalized = addr
+        .replace("localhost", "127.0.0.1")
+        .replace("[::]", "[::1]");
+
+    SocketAddr::from_str(&normalized).ok()
+}
+
+/// Normalize address string for comparison
+fn normalize_address(addr: &str) -> String {
+    addr.to_lowercase()
+        .replace("localhost", "127.0.0.1")
+        .replace("[::]", "[::1]")
+}
+
+/// Check if the error indicates that the feature is unavailable
+fn is_feature_unavailable_error(error: &IggyError) -> bool {
+    matches!(
+        error,
+        IggyError::FeatureUnavailable | IggyError::InvalidCommand
+    ) || error.as_code() == IggyErrorDiscriminants::FeatureUnavailable as u32
+}
+
+/// Struct to track leader redirection state
+#[derive(Debug, Clone)]
+pub struct LeaderRedirectionState {
+    pub redirect_count: u8,
+    pub last_leader_address: Option<String>,
+}
+
+impl LeaderRedirectionState {
+    pub fn new() -> Self {
+        Self {
+            redirect_count: 0,
+            last_leader_address: None,
+        }
+    }
+
+    pub fn can_redirect(&self) -> bool {
+        self.redirect_count < MAX_LEADER_REDIRECTS
+    }
+
+    pub fn increment_redirect(&mut self, leader_address: String) {
+        self.redirect_count += 1;
+        self.last_leader_address = Some(leader_address);
+    }
+
+    pub fn reset(&mut self) {
+        self.redirect_count = 0;
+        self.last_leader_address = None;
+    }
+}
+
+impl Default for LeaderRedirectionState {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_is_same_address() {
+        assert!(is_same_address("127.0.0.1:8090", "127.0.0.1:8090"));
+        assert!(is_same_address("localhost:8090", "127.0.0.1:8090"));
+        assert!(!is_same_address("127.0.0.1:8090", "127.0.0.1:8091"));
+        assert!(!is_same_address("192.168.1.1:8090", "127.0.0.1:8090"));
+    }
+
+    #[test]
+    fn test_normalize_address() {
+        assert_eq!(normalize_address("localhost:8090"), "127.0.0.1:8090");
+        assert_eq!(normalize_address("LOCALHOST:8090"), "127.0.0.1:8090");
+        assert_eq!(normalize_address("[::]:8090"), "[::1]:8090");
+    }
+
+    #[test]
+    fn test_leader_redirection_state() {
+        let mut state = LeaderRedirectionState::new();
+        assert!(state.can_redirect());
+        assert_eq!(state.redirect_count, 0);
+
+        state.increment_redirect("127.0.0.1:8090".to_string());
+        assert!(state.can_redirect());
+        assert_eq!(state.redirect_count, 1);
+
+        state.increment_redirect("127.0.0.1:8091".to_string());
+        state.increment_redirect("127.0.0.1:8092".to_string());
+        assert!(!state.can_redirect());
+        assert_eq!(state.redirect_count, 3);
+
+        state.reset();
+        assert!(state.can_redirect());
+        assert_eq!(state.redirect_count, 0);
+    }
+}
diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs
index c68b7dd5a..59604e296 100644
--- a/core/sdk/src/lib.rs
+++ b/core/sdk/src/lib.rs
@@ -22,6 +22,7 @@ pub mod client_wrappers;
 pub mod clients;
 pub mod consumer_ext;
 pub mod http;
+mod leader_aware;
 pub mod prelude;
 pub mod quic;
 pub mod stream_builder;
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index d89afe86a..8780dd947 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -30,6 +30,7 @@
 pub use crate::client_provider;
 pub use crate::client_provider::ClientProviderConfig;
 pub use crate::client_wrappers::client_wrapper::ClientWrapper;
+pub use crate::client_wrappers::connection_info::ConnectionInfo;
 pub use crate::clients::client::IggyClient;
 pub use crate::clients::client_builder::IggyClientBuilder;
 pub use crate::clients::consumer::{
@@ -53,19 +54,19 @@ pub use iggy_binary_protocol::{
 };
 pub use iggy_common::{
     Aes256GcmEncryptor, Args, ArgsOptional, AutoLogin, BytesSerializable, 
CacheMetrics,
-    CacheMetricsKey, ClientError, ClientInfoDetails, CompressionAlgorithm, 
Consumer,
-    ConsumerGroupDetails, ConsumerKind, EncryptorKind, FlushUnsavedBuffer, 
GlobalPermissions,
-    HeaderKey, HeaderValue, HttpClientConfig, HttpClientConfigBuilder, IdKind, 
Identifier,
-    IdentityInfo, IggyByteSize, IggyDuration, IggyError, IggyExpiry, 
IggyIndexView, IggyMessage,
-    IggyMessageHeader, IggyMessageHeaderView, IggyMessageView, 
IggyMessageViewIterator,
-    IggyTimestamp, MaxTopicSize, Partition, Partitioner, Partitioning, 
Permissions,
-    PersonalAccessTokenExpiry, PollMessages, PolledMessages, PollingKind, 
PollingStrategy,
-    QuicClientConfig, QuicClientConfigBuilder, QuicClientReconnectionConfig, 
SendMessages,
-    Sizeable, SnapshotCompression, Stats, Stream, StreamDetails, 
StreamPermissions,
-    SystemSnapshotType, TcpClientConfig, TcpClientConfigBuilder, 
TcpClientReconnectionConfig,
-    Topic, TopicDetails, TopicPermissions, TransportProtocol, UserId, 
UserStatus, Validatable,
-    WebSocketClientConfig, WebSocketClientConfigBuilder, 
WebSocketClientReconnectionConfig,
-    defaults, locking,
+    CacheMetricsKey, ClientError, ClientInfoDetails, ClusterMetadata, 
ClusterNode, ClusterNodeRole,
+    ClusterNodeStatus, CompressionAlgorithm, Consumer, ConsumerGroupDetails, 
ConsumerKind,
+    EncryptorKind, FlushUnsavedBuffer, GlobalPermissions, HeaderKey, 
HeaderValue, HttpClientConfig,
+    HttpClientConfigBuilder, IdKind, Identifier, IdentityInfo, IggyByteSize, 
IggyDuration,
+    IggyError, IggyExpiry, IggyIndexView, IggyMessage, IggyMessageHeader, 
IggyMessageHeaderView,
+    IggyMessageView, IggyMessageViewIterator, IggyTimestamp, MaxTopicSize, 
Partition, Partitioner,
+    Partitioning, Permissions, PersonalAccessTokenExpiry, PollMessages, 
PolledMessages,
+    PollingKind, PollingStrategy, QuicClientConfig, QuicClientConfigBuilder,
+    QuicClientReconnectionConfig, SendMessages, Sizeable, SnapshotCompression, 
Stats, Stream,
+    StreamDetails, StreamPermissions, SystemSnapshotType, TcpClientConfig, 
TcpClientConfigBuilder,
+    TcpClientReconnectionConfig, Topic, TopicDetails, TopicPermissions, 
TransportProtocol, UserId,
+    UserStatus, Validatable, WebSocketClientConfig, 
WebSocketClientConfigBuilder,
+    WebSocketClientReconnectionConfig, defaults, locking,
 };
 pub use iggy_common::{
     IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, IGGY_MESSAGE_HEADER_SIZE,
diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs
index 831890cb4..2f5e3c6d1 100644
--- a/core/sdk/src/quic/quic_client.rs
+++ b/core/sdk/src/quic/quic_client.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use crate::leader_aware::{LeaderRedirectionState, 
check_and_redirect_to_leader};
 use crate::prelude::AutoLogin;
 use iggy_binary_protocol::{
     BinaryClient, BinaryTransport, Client, PersonalAccessTokenClient, 
UserClient,
@@ -28,7 +29,7 @@ use async_trait::async_trait;
 use bytes::Bytes;
 use iggy_common::{
     ClientState, Command, ConnectionString, ConnectionStringUtils, 
Credentials, DiagnosticEvent,
-    QuicConnectionStringOptions, TransportProtocol,
+    IggyErrorDiscriminants, QuicConnectionStringOptions, TransportProtocol,
 };
 use quinn::crypto::rustls::QuicClientConfig as QuinnQuicClientConfig;
 use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, RecvStream, 
VarInt};
@@ -51,10 +52,11 @@ pub struct QuicClient {
     pub(crate) endpoint: Endpoint,
     pub(crate) connection: Arc<Mutex<Option<Connection>>>,
     pub(crate) config: Arc<QuicClientConfig>,
-    pub(crate) server_address: SocketAddr,
     pub(crate) state: Mutex<ClientState>,
     events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>),
     connected_at: Mutex<Option<IggyTimestamp>>,
+    leader_redirection_state: Mutex<LeaderRedirectionState>,
+    pub(crate) current_server_address: Mutex<SocketAddr>,
 }
 
 unsafe impl Send for QuicClient {}
@@ -126,9 +128,10 @@ impl BinaryTransport for QuicClient {
         }
 
         self.disconnect().await?;
+        let server_address = 
self.current_server_address.lock().await.to_string();
         info!(
             "Reconnecting to the server: {}, by client: {}",
-            self.config.server_address, self.config.client_address
+            server_address, self.config.client_address
         );
         self.connect().await?;
         self.send_raw(code, payload).await
@@ -195,11 +198,12 @@ impl QuicClient {
         Ok(Self {
             config,
             endpoint,
-            server_address,
             connection: Arc::new(Mutex::new(None)),
             state: Mutex::new(ClientState::Disconnected),
             events: broadcast(1000),
             connected_at: Mutex::new(None),
+            leader_redirection_state: 
Mutex::new(LeaderRedirectionState::new()),
+            current_server_address: Mutex::new(server_address),
         })
     }
 
@@ -235,11 +239,20 @@ impl QuicClient {
                 .map_err(|_| IggyError::InvalidNumberEncoding)?,
         );
         if status != 0 {
-            error!(
-                "Received an invalid response with status: {} ({}).",
-                status,
-                IggyError::from_code_as_string(status)
-            );
+            // Log FeatureUnavailable as debug instead of error (e.g., when 
clustering is disabled)
+            if status == IggyErrorDiscriminants::FeatureUnavailable as u32 {
+                tracing::debug!(
+                    "Feature unavailable on server: {} ({})",
+                    status,
+                    IggyError::from_code_as_string(status)
+                );
+            } else {
+                error!(
+                    "Received an invalid response with status: {} ({}).",
+                    status,
+                    IggyError::from_code_as_string(status)
+                );
+            }
 
             return Err(IggyError::from_code(status));
         }
@@ -260,135 +273,189 @@ impl QuicClient {
     }
 
     async fn connect(&self) -> Result<(), IggyError> {
-        match self.get_state().await {
-            ClientState::Shutdown => {
-                trace!("Cannot connect. Client is shutdown.");
-                return Err(IggyError::ClientShutdown);
-            }
-            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated => {
-                trace!("Client is already connected.");
-                return Ok(());
-            }
-            ClientState::Connecting => {
-                trace!("Client is already connecting.");
-                return Ok(());
-            }
-            _ => {}
-        }
-
-        self.set_state(ClientState::Connecting).await;
-        if let Some(connected_at) = self.connected_at.lock().await.as_ref() {
-            let now = IggyTimestamp::now();
-            let elapsed = now.as_micros() - connected_at.as_micros();
-            let interval = 
self.config.reconnection.reestablish_after.as_micros();
-            trace!(
-                "Elapsed time since last connection: {}",
-                IggyDuration::from(elapsed)
-            );
-            if elapsed < interval {
-                let remaining = IggyDuration::from(interval - elapsed);
-                info!("Trying to connect to the server in: {remaining}",);
-                sleep(remaining.get_duration()).await;
-            }
-        }
+        self.connect_with_leader_detection().await
+    }
 
-        let mut retry_count = 0;
-        let connection;
-        let remote_address;
+    async fn connect_with_leader_detection(&self) -> Result<(), IggyError> {
+        // Leader detection loop
         loop {
-            info!(
-                "{NAME} client is connecting to server: {}...",
-                self.config.server_address
-            );
-            let connection_result = self
-                .endpoint
-                .connect(self.server_address, &self.config.server_name)
-                .unwrap()
-                .await;
-
-            if connection_result.is_err() {
-                error!(
-                    "Failed to connect to server: {}",
-                    self.config.server_address
-                );
-                if !self.config.reconnection.enabled {
-                    warn!("Automatic reconnection is disabled.");
-                    return Err(IggyError::CannotEstablishConnection);
+            match self.get_state().await {
+                ClientState::Shutdown => {
+                    trace!("Cannot connect. Client is shutdown.");
+                    return Err(IggyError::ClientShutdown);
                 }
-
-                let unlimited_retries = 
self.config.reconnection.max_retries.is_none();
-                let max_retries = 
self.config.reconnection.max_retries.unwrap_or_default();
-                let max_retries_str =
-                    if let Some(max_retries) = 
self.config.reconnection.max_retries {
-                        max_retries.to_string()
-                    } else {
-                        "unlimited".to_string()
-                    };
-
-                let interval_str = 
self.config.reconnection.interval.as_human_time_string();
-                if unlimited_retries || retry_count < max_retries {
-                    retry_count += 1;
-                    info!(
-                        "Retrying to connect to server 
({retry_count}/{max_retries_str}): {} in: {interval_str}",
-                        self.config.server_address,
-                    );
-                    
sleep(self.config.reconnection.interval.get_duration()).await;
-                    continue;
+                ClientState::Connected
+                | ClientState::Authenticating
+                | ClientState::Authenticated => {
+                    trace!("Client is already connected.");
+                    return Ok(());
                 }
-
-                self.set_state(ClientState::Disconnected).await;
-                self.publish_event(DiagnosticEvent::Disconnected).await;
-                return Err(IggyError::CannotEstablishConnection);
+                ClientState::Connecting => {
+                    trace!("Client is already connecting.");
+                    return Ok(());
+                }
+                _ => {}
             }
 
-            connection = connection_result.map_err(|error| {
-                error!("Failed to establish QUIC connection: {error}");
-                IggyError::CannotEstablishConnection
-            })?;
-            remote_address = connection.remote_address();
-            break;
-        }
-
-        let now = IggyTimestamp::now();
-        info!("{NAME} client has connected to server: {remote_address} at 
{now}",);
-        self.set_state(ClientState::Connected).await;
-        self.connection.lock().await.replace(connection);
-        self.connected_at.lock().await.replace(now);
-        self.publish_event(DiagnosticEvent::Connected).await;
-
-        match &self.config.auto_login {
-            AutoLogin::Disabled => {
-                info!("Automatic sign-in is disabled.");
-                Ok(())
+            self.set_state(ClientState::Connecting).await;
+            if let Some(connected_at) = 
self.connected_at.lock().await.as_ref() {
+                let now = IggyTimestamp::now();
+                let elapsed = now.as_micros() - connected_at.as_micros();
+                let interval = 
self.config.reconnection.reestablish_after.as_micros();
+                trace!(
+                    "Elapsed time since last connection: {}",
+                    IggyDuration::from(elapsed)
+                );
+                if elapsed < interval {
+                    let remaining = IggyDuration::from(interval - elapsed);
+                    info!("Trying to connect to the server in: {remaining}",);
+                    sleep(remaining.get_duration()).await;
+                }
             }
-            AutoLogin::Enabled(credentials) => {
+
+            let mut retry_count = 0;
+            let connection;
+            let remote_address;
+            loop {
+                let server_address = *self.current_server_address.lock().await;
                 info!(
-                    "{NAME} client: {} is signing in...",
-                    self.config.client_address
+                    "{NAME} client is connecting to server: {}...",
+                    server_address
                 );
-                self.set_state(ClientState::Authenticating).await;
-                match credentials {
-                    Credentials::UsernamePassword(username, password) => {
-                        self.login_user(username, password).await?;
-                        self.publish_event(DiagnosticEvent::SignedIn).await;
-                        info!(
-                            "{NAME} client: {} has signed in with the user 
credentials, username: {username}",
-                            self.config.client_address
-                        );
-                        Ok(())
+                let connection_result = self
+                    .endpoint
+                    .connect(server_address, &self.config.server_name)
+                    .unwrap()
+                    .await;
+
+                if connection_result.is_err() {
+                    error!("Failed to connect to server: {}", server_address);
+                    if !self.config.reconnection.enabled {
+                        warn!("Automatic reconnection is disabled.");
+                        return Err(IggyError::CannotEstablishConnection);
                     }
-                    Credentials::PersonalAccessToken(token) => {
-                        self.login_with_personal_access_token(token).await?;
-                        self.publish_event(DiagnosticEvent::SignedIn).await;
+
+                    let unlimited_retries = 
self.config.reconnection.max_retries.is_none();
+                    let max_retries = 
self.config.reconnection.max_retries.unwrap_or_default();
+                    let max_retries_str =
+                        if let Some(max_retries) = 
self.config.reconnection.max_retries {
+                            max_retries.to_string()
+                        } else {
+                            "unlimited".to_string()
+                        };
+
+                    let interval_str = 
self.config.reconnection.interval.as_human_time_string();
+                    if unlimited_retries || retry_count < max_retries {
+                        retry_count += 1;
                         info!(
-                            "{NAME} client: {} has signed in with a personal 
access token.",
-                            self.config.client_address
+                            "Retrying to connect to server 
({retry_count}/{max_retries_str}): {} in: {interval_str}",
+                            server_address,
                         );
-                        Ok(())
+                        
sleep(self.config.reconnection.interval.get_duration()).await;
+                        continue;
                     }
+
+                    self.set_state(ClientState::Disconnected).await;
+                    self.publish_event(DiagnosticEvent::Disconnected).await;
+                    return Err(IggyError::CannotEstablishConnection);
                 }
+
+                connection = connection_result.map_err(|error| {
+                    error!("Failed to establish QUIC connection: {error}");
+                    IggyError::CannotEstablishConnection
+                })?;
+                remote_address = connection.remote_address();
+                break;
             }
-        }
+
+            let now = IggyTimestamp::now();
+            info!("{NAME} client has connected to server: {remote_address} at 
{now}",);
+            self.set_state(ClientState::Connected).await;
+            self.connection.lock().await.replace(connection);
+            self.connected_at.lock().await.replace(now);
+            self.publish_event(DiagnosticEvent::Connected).await;
+
+            // Handle auto-login
+            let should_redirect = match &self.config.auto_login {
+                AutoLogin::Disabled => {
+                    info!("Automatic sign-in is disabled.");
+                    false
+                }
+                AutoLogin::Enabled(credentials) => {
+                    info!(
+                        "{NAME} client: {} is signing in...",
+                        self.config.client_address
+                    );
+                    self.set_state(ClientState::Authenticating).await;
+                    match credentials {
+                        Credentials::UsernamePassword(username, password) => {
+                            self.login_user(username, password).await?;
+                            
self.publish_event(DiagnosticEvent::SignedIn).await;
+                            info!(
+                                "{NAME} client: {} has signed in with the user 
credentials, username: {username}",
+                                self.config.client_address
+                            );
+                        }
+                        Credentials::PersonalAccessToken(token) => {
+                            
self.login_with_personal_access_token(token).await?;
+                            
self.publish_event(DiagnosticEvent::SignedIn).await;
+                            info!(
+                                "{NAME} client: {} has signed in with a 
personal access token.",
+                                self.config.client_address
+                            );
+                        }
+                    }
+
+                    // Check if we need to redirect to leader
+                    let current_address = 
self.current_server_address.lock().await.to_string();
+                    let leader_address =
+                        check_and_redirect_to_leader(self, 
&current_address).await?;
+                    if let Some(new_leader_address) = leader_address {
+                        let mut redirection_state = 
self.leader_redirection_state.lock().await;
+                        if !redirection_state.can_redirect() {
+                            warn!(
+                                "Maximum leader redirections reached, 
continuing with current connection"
+                            );
+                            false
+                        } else {
+                            info!(
+                                "Current node is not leader, redirecting to 
leader at: {}",
+                                new_leader_address
+                            );
+                            
redirection_state.increment_redirect(new_leader_address.clone());
+                            drop(redirection_state);
+
+                            // Disconnect from current node
+                            self.disconnect().await?;
+
+                            // Update current server address - parse the new 
address
+                            let new_socket_addr =
+                                
new_leader_address.parse::<SocketAddr>().map_err(|e| {
+                                    error!(
+                                        "Failed to parse leader address {}: 
{}",
+                                        new_leader_address, e
+                                    );
+                                    IggyError::InvalidServerAddress
+                                })?;
+                            *self.current_server_address.lock().await = 
new_socket_addr;
+
+                            true // Need to redirect
+                        }
+                    } else {
+                        // Reset redirection state on successful connection to 
leader
+                        self.leader_redirection_state.lock().await.reset();
+                        false
+                    }
+                }
+            };
+
+            if should_redirect {
+                // Continue to next iteration of the loop to reconnect to 
leader
+                continue;
+            }
+
+            return Ok(());
+        } // End of leader detection loop
     }
 
     async fn shutdown(&self) -> Result<(), IggyError> {
diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs
index c4adc3f5f..e57eb2ccc 100644
--- a/core/sdk/src/tcp/tcp_client.rs
+++ b/core/sdk/src/tcp/tcp_client.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use crate::leader_aware::{LeaderRedirectionState, 
check_and_redirect_to_leader};
 use crate::prelude::Client;
 use crate::prelude::TcpClientConfig;
 use crate::tcp::tcp_connection_stream::TcpConnectionStream;
@@ -54,6 +55,8 @@ pub struct TcpClient {
     client_address: Mutex<Option<SocketAddr>>,
     events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>),
     connected_at: Mutex<Option<IggyTimestamp>>,
+    leader_redirection_state: Mutex<LeaderRedirectionState>,
+    pub(crate) current_server_address: Mutex<String>,
 }
 
 impl Default for TcpClient {
@@ -129,9 +132,10 @@ impl BinaryTransport for TcpClient {
 
         {
             let client_address = self.get_client_address_value().await;
+            let server_address = 
self.current_server_address.lock().await.clone();
             info!(
                 "Reconnecting to the server: {} by client: 
{client_address}...",
-                self.config.server_address
+                server_address
             );
         }
 
@@ -191,6 +195,7 @@ impl TcpClient {
 
     /// Create a new TCP client based on the provided configuration.
     pub fn create(config: Arc<TcpClientConfig>) -> Result<Self, IggyError> {
+        let server_address = config.server_address.clone();
         Ok(Self {
             config,
             client_address: Mutex::new(None),
@@ -198,6 +203,8 @@ impl TcpClient {
             state: Mutex::new(ClientState::Disconnected),
             events: broadcast(1000),
             connected_at: Mutex::new(None),
+            leader_redirection_state: 
Mutex::new(LeaderRedirectionState::new()),
+            current_server_address: Mutex::new(server_address),
         })
     }
 
@@ -219,6 +226,13 @@ impl TcpClient {
                     status,
                     IggyError::from_code_as_string(status)
                 )
+            } else if status == IggyErrorDiscriminants::FeatureUnavailable as 
u32 {
+                // Feature unavailable - likely clustering is disabled on 
server
+                tracing::debug!(
+                    "Feature unavailable on server: {} ({})",
+                    status,
+                    IggyError::from_code_as_string(status)
+                )
             } else {
                 error!(
                     "Received an invalid response with status: {} ({}).",
@@ -242,209 +256,257 @@ impl TcpClient {
     }
 
     async fn connect(&self) -> Result<(), IggyError> {
-        match self.get_state().await {
-            ClientState::Shutdown => {
-                trace!("Cannot connect. Client is shutdown.");
-                return Err(IggyError::ClientShutdown);
-            }
-            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated => {
-                let client_address = self.get_client_address_value().await;
-                trace!("Client: {client_address} is already connected.");
-                return Ok(());
-            }
-            ClientState::Connecting => {
-                trace!("Client is already connecting.");
-                return Ok(());
-            }
-            _ => {}
-        }
-
-        self.set_state(ClientState::Connecting).await;
-        if let Some(connected_at) = self.connected_at.lock().await.as_ref() {
-            let now = IggyTimestamp::now();
-            let elapsed = now.as_micros() - connected_at.as_micros();
-            let interval = 
self.config.reconnection.reestablish_after.as_micros();
-            trace!(
-                "Elapsed time since last connection: {}",
-                IggyDuration::from(elapsed)
-            );
-            if elapsed < interval {
-                let remaining = IggyDuration::from(interval - elapsed);
-                info!("Trying to connect to the server in: {remaining}",);
-                sleep(remaining.get_duration()).await;
-            }
-        }
+        self.connect_with_leader_detection().await
+    }
 
-        let tls_enabled = self.config.tls_enabled;
-        let mut retry_count = 0;
-        let connection_stream: ConnectionStreamKind;
-        let remote_address;
-        let client_address;
+    async fn connect_with_leader_detection(&self) -> Result<(), IggyError> {
+        // Leader detection loop
         loop {
-            info!(
-                "{NAME} client is connecting to server: {}...",
-                self.config.server_address
-            );
+            match self.get_state().await {
+                ClientState::Shutdown => {
+                    trace!("Cannot connect. Client is shutdown.");
+                    return Err(IggyError::ClientShutdown);
+                }
+                ClientState::Connected
+                | ClientState::Authenticating
+                | ClientState::Authenticated => {
+                    let client_address = self.get_client_address_value().await;
+                    trace!("Client: {client_address} is already connected.");
+                    return Ok(());
+                }
+                ClientState::Connecting => {
+                    trace!("Client is already connecting.");
+                    return Ok(());
+                }
+                _ => {}
+            }
 
-            let connection = 
TcpStream::connect(&self.config.server_address).await;
-            if let Err(err) = &connection {
-                error!(
-                    "Failed to connect to server: {}. Error: {}",
-                    self.config.server_address, err
+            self.set_state(ClientState::Connecting).await;
+            if let Some(connected_at) = 
self.connected_at.lock().await.as_ref() {
+                let now = IggyTimestamp::now();
+                let elapsed = now.as_micros() - connected_at.as_micros();
+                let interval = 
self.config.reconnection.reestablish_after.as_micros();
+                trace!(
+                    "Elapsed time since last connection: {}",
+                    IggyDuration::from(elapsed)
                 );
-                if !self.config.reconnection.enabled {
-                    warn!("Automatic reconnection is disabled.");
-                    return Err(IggyError::CannotEstablishConnection);
+                if elapsed < interval {
+                    let remaining = IggyDuration::from(interval - elapsed);
+                    info!("Trying to connect to the server in: {remaining}",);
+                    sleep(remaining.get_duration()).await;
                 }
+            }
 
-                let unlimited_retries = 
self.config.reconnection.max_retries.is_none();
-                let max_retries = 
self.config.reconnection.max_retries.unwrap_or_default();
-                let max_retries_str =
-                    if let Some(max_retries) = 
self.config.reconnection.max_retries {
-                        max_retries.to_string()
-                    } else {
-                        "unlimited".to_string()
-                    };
-
-                let interval_str = 
self.config.reconnection.interval.as_human_time_string();
-                if unlimited_retries || retry_count < max_retries {
-                    retry_count += 1;
-                    info!(
-                        "Retrying to connect to server 
({retry_count}/{max_retries_str}): {} in: {interval_str}",
-                        self.config.server_address,
+            let tls_enabled = self.config.tls_enabled;
+            let mut retry_count = 0;
+            let connection_stream: ConnectionStreamKind;
+            let remote_address;
+            let client_address;
+            loop {
+                let server_address = 
self.current_server_address.lock().await.clone();
+                info!(
+                    "{NAME} client is connecting to server: {}...",
+                    server_address
+                );
+
+                let connection = TcpStream::connect(&server_address).await;
+                if let Err(err) = &connection {
+                    error!(
+                        "Failed to connect to server: {}. Error: {}",
+                        server_address, err
                     );
-                    
sleep(self.config.reconnection.interval.get_duration()).await;
-                    continue;
+                    if !self.config.reconnection.enabled {
+                        warn!("Automatic reconnection is disabled.");
+                        return Err(IggyError::CannotEstablishConnection);
+                    }
+
+                    let unlimited_retries = 
self.config.reconnection.max_retries.is_none();
+                    let max_retries = 
self.config.reconnection.max_retries.unwrap_or_default();
+                    let max_retries_str =
+                        if let Some(max_retries) = 
self.config.reconnection.max_retries {
+                            max_retries.to_string()
+                        } else {
+                            "unlimited".to_string()
+                        };
+
+                    let interval_str = 
self.config.reconnection.interval.as_human_time_string();
+                    if unlimited_retries || retry_count < max_retries {
+                        retry_count += 1;
+                        info!(
+                            "Retrying to connect to server 
({retry_count}/{max_retries_str}): {} in: {interval_str}",
+                            server_address,
+                        );
+                        
sleep(self.config.reconnection.interval.get_duration()).await;
+                        continue;
+                    }
+
+                    self.set_state(ClientState::Disconnected).await;
+                    self.publish_event(DiagnosticEvent::Disconnected).await;
+                    return Err(IggyError::CannotEstablishConnection);
                 }
 
-                self.set_state(ClientState::Disconnected).await;
-                self.publish_event(DiagnosticEvent::Disconnected).await;
-                return Err(IggyError::CannotEstablishConnection);
-            }
+                let stream = connection.map_err(|error| {
+                    error!("Failed to establish TCP connection to the server: 
{error}",);
+                    IggyError::CannotEstablishConnection
+                })?;
+                client_address = stream.local_addr().map_err(|error| {
+                    error!("Failed to get the local address of the client: 
{error}",);
+                    IggyError::CannotEstablishConnection
+                })?;
+                remote_address = stream.peer_addr().map_err(|error| {
+                    error!("Failed to get the remote address of the server: 
{error}",);
+                    IggyError::CannotEstablishConnection
+                })?;
+                self.client_address.lock().await.replace(client_address);
 
-            let stream = connection.map_err(|error| {
-                error!("Failed to establish TCP connection to the server: 
{error}",);
-                IggyError::CannotEstablishConnection
-            })?;
-            client_address = stream.local_addr().map_err(|error| {
-                error!("Failed to get the local address of the client: 
{error}",);
-                IggyError::CannotEstablishConnection
-            })?;
-            remote_address = stream.peer_addr().map_err(|error| {
-                error!("Failed to get the remote address of the server: 
{error}",);
-                IggyError::CannotEstablishConnection
-            })?;
-            self.client_address.lock().await.replace(client_address);
-
-            if let Err(e) = stream.set_nodelay(self.config.nodelay) {
-                error!("Failed to set the nodelay option on the client: {e}, 
continuing...",);
-            }
+                if let Err(e) = stream.set_nodelay(self.config.nodelay) {
+                    error!("Failed to set the nodelay option on the client: 
{e}, continuing...",);
+                }
 
-            if !tls_enabled {
-                connection_stream =
-                    
ConnectionStreamKind::Tcp(TcpConnectionStream::new(client_address, stream));
-                break;
-            }
+                if !tls_enabled {
+                    connection_stream =
+                        
ConnectionStreamKind::Tcp(TcpConnectionStream::new(client_address, stream));
+                    break;
+                }
 
-            let _ = 
rustls::crypto::aws_lc_rs::default_provider().install_default();
-
-            let config = if self.config.tls_validate_certificate {
-                let mut root_cert_store = rustls::RootCertStore::empty();
-                if let Some(certificate_path) = &self.config.tls_ca_file {
-                    for cert in
-                        
CertificateDer::pem_file_iter(certificate_path).map_err(|error| {
-                            error!("Failed to read the CA file: 
{certificate_path}. {error}",);
-                            IggyError::InvalidTlsCertificatePath
-                        })?
-                    {
-                        let certificate = cert.map_err(|error| {
+                let _ = 
rustls::crypto::aws_lc_rs::default_provider().install_default();
+
+                let config = if self.config.tls_validate_certificate {
+                    let mut root_cert_store = rustls::RootCertStore::empty();
+                    if let Some(certificate_path) = &self.config.tls_ca_file {
+                        for cert in
+                            
CertificateDer::pem_file_iter(certificate_path).map_err(|error| {
+                                error!("Failed to read the CA file: 
{certificate_path}. {error}",);
+                                IggyError::InvalidTlsCertificatePath
+                            })?
+                        {
+                            let certificate = cert.map_err(|error| {
                             error!(
                                 "Failed to read a certificate from the CA 
file: {certificate_path}. {error}",
                             );
                             IggyError::InvalidTlsCertificate
                         })?;
-                        root_cert_store.add(certificate).map_err(|error| {
+                            root_cert_store.add(certificate).map_err(|error| {
                             error!(
                                 "Failed to add a certificate to the root 
certificate store. {error}",
                             );
                             IggyError::InvalidTlsCertificate
                         })?;
+                        }
+                    } else {
+                        
root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
                     }
+
+                    rustls::ClientConfig::builder()
+                        .with_root_certificates(root_cert_store)
+                        .with_no_client_auth()
+                } else {
+                    use crate::tcp::tcp_tls_verifier::NoServerVerification;
+                    rustls::ClientConfig::builder()
+                        .dangerous()
+                        
.with_custom_certificate_verifier(Arc::new(NoServerVerification))
+                        .with_no_client_auth()
+                };
+                let connector = TlsConnector::from(Arc::new(config));
+                let tls_domain = if self.config.tls_domain.is_empty() {
+                    // Extract hostname/IP from server_address when tls_domain 
is not specified
+                    server_address
+                        .split(':')
+                        .next()
+                        .unwrap_or(&server_address)
+                        .to_string()
                 } else {
-                    
root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
+                    self.config.tls_domain.to_owned()
+                };
+                let domain = ServerName::try_from(tls_domain).map_err(|error| {
+                    error!("Failed to create a server name from the domain. 
{error}",);
+                    IggyError::InvalidTlsDomain
+                })?;
+                let stream = connector.connect(domain, 
stream).await.map_err(|error| {
+                    error!("Failed to establish a TLS connection to the 
server: {error}",);
+                    IggyError::CannotEstablishConnection
+                })?;
+                connection_stream = 
ConnectionStreamKind::TcpTls(TcpTlsConnectionStream::new(
+                    client_address,
+                    TlsStream::Client(stream),
+                ));
+                break;
+            }
+
+            let now = IggyTimestamp::now();
+            info!(
+                "{NAME} client: {client_address} has connected to server: 
{remote_address} at: {now}",
+            );
+            self.stream.lock().await.replace(connection_stream);
+            self.set_state(ClientState::Connected).await;
+            self.connected_at.lock().await.replace(now);
+            self.publish_event(DiagnosticEvent::Connected).await;
+            // Handle auto-login
+            let should_redirect = match &self.config.auto_login {
+                AutoLogin::Disabled => {
+                    info!("Automatic sign-in is disabled.");
+                    false
                 }
+                AutoLogin::Enabled(credentials) => {
+                    info!("{NAME} client: {client_address} is signing in...");
+                    self.set_state(ClientState::Authenticating).await;
+                    match credentials {
+                        Credentials::UsernamePassword(username, password) => {
+                            self.login_user(username, password).await?;
+                            info!(
+                                "{NAME} client: {client_address} has signed in 
with the user credentials, username: {username}",
+                            );
+                        }
+                        Credentials::PersonalAccessToken(token) => {
+                            
self.login_with_personal_access_token(token).await?;
+                            info!(
+                                "{NAME} client: {client_address} has signed in 
with a personal access token.",
+                            );
+                        }
+                    }
 
-                rustls::ClientConfig::builder()
-                    .with_root_certificates(root_cert_store)
-                    .with_no_client_auth()
-            } else {
-                use crate::tcp::tcp_tls_verifier::NoServerVerification;
-                rustls::ClientConfig::builder()
-                    .dangerous()
-                    
.with_custom_certificate_verifier(Arc::new(NoServerVerification))
-                    .with_no_client_auth()
-            };
-            let connector = TlsConnector::from(Arc::new(config));
-            let tls_domain = if self.config.tls_domain.is_empty() {
-                // Extract hostname/IP from server_address when tls_domain is 
not specified
-                self.config
-                    .server_address
-                    .split(':')
-                    .next()
-                    .unwrap_or(&self.config.server_address)
-                    .to_string()
-            } else {
-                self.config.tls_domain.to_owned()
-            };
-            let domain = ServerName::try_from(tls_domain).map_err(|error| {
-                error!("Failed to create a server name from the domain. 
{error}",);
-                IggyError::InvalidTlsDomain
-            })?;
-            let stream = connector.connect(domain, 
stream).await.map_err(|error| {
-                error!("Failed to establish a TLS connection to the server: 
{error}",);
-                IggyError::CannotEstablishConnection
-            })?;
-            connection_stream = 
ConnectionStreamKind::TcpTls(TcpTlsConnectionStream::new(
-                client_address,
-                TlsStream::Client(stream),
-            ));
-            break;
-        }
+                    // Check if we need to redirect to leader
+                    let current_address = 
self.current_server_address.lock().await.clone();
+                    let leader_address =
+                        check_and_redirect_to_leader(self, 
&current_address).await?;
+                    if let Some(new_leader_address) = leader_address {
+                        let mut redirection_state = 
self.leader_redirection_state.lock().await;
+                        if !redirection_state.can_redirect() {
+                            warn!(
+                                "Maximum leader redirections reached, 
continuing with current connection"
+                            );
+                            false
+                        } else {
+                            info!(
+                                "Current node is not leader, redirecting to 
leader at: {}",
+                                new_leader_address
+                            );
+                            
redirection_state.increment_redirect(new_leader_address.clone());
+                            drop(redirection_state);
 
-        let now = IggyTimestamp::now();
-        info!(
-            "{NAME} client: {client_address} has connected to server: 
{remote_address} at: {now}",
-        );
-        self.stream.lock().await.replace(connection_stream);
-        self.set_state(ClientState::Connected).await;
-        self.connected_at.lock().await.replace(now);
-        self.publish_event(DiagnosticEvent::Connected).await;
-        match &self.config.auto_login {
-            AutoLogin::Disabled => {
-                info!("Automatic sign-in is disabled.");
-                Ok(())
-            }
-            AutoLogin::Enabled(credentials) => {
-                info!("{NAME} client: {client_address} is signing in...");
-                self.set_state(ClientState::Authenticating).await;
-                match credentials {
-                    Credentials::UsernamePassword(username, password) => {
-                        self.login_user(username, password).await?;
-                        info!(
-                            "{NAME} client: {client_address} has signed in 
with the user credentials, username: {username}",
-                        );
-                        Ok(())
-                    }
-                    Credentials::PersonalAccessToken(token) => {
-                        self.login_with_personal_access_token(token).await?;
-                        info!(
-                            "{NAME} client: {client_address} has signed in 
with a personal access token.",
-                        );
-                        Ok(())
+                            // Disconnect from current node
+                            self.disconnect().await?;
+
+                            // Update current server address
+                            *self.current_server_address.lock().await = 
new_leader_address;
+
+                            true // Need to redirect
+                        }
+                    } else {
+                        // Reset redirection state on successful connection to 
leader
+                        self.leader_redirection_state.lock().await.reset();
+                        false
                     }
                 }
+            };
+
+            if should_redirect {
+                // Continue to next iteration of the loop to reconnect to 
leader
+                continue;
             }
-        }
+
+            return Ok(());
+        } // End of leader detection loop
     }
 
     async fn disconnect(&self) -> Result<(), IggyError> {
diff --git a/core/server/src/args.rs b/core/server/src/args.rs
index 3a6cfdb96..64cddb1e7 100644
--- a/core/server/src/args.rs
+++ b/core/server/src/args.rs
@@ -105,4 +105,21 @@ pub struct Args {
     ///   iggy-server --with-default-root-credentials     # Use 'iggy/iggy' as 
root credentials
     #[arg(long, default_value_t = false, verbatim_doc_comment)]
     pub with_default_root_credentials: bool,
+
+    /// Run server as a follower node (FOR TESTING LEADER REDIRECTION)
+    ///
+    /// When this flag is set, the server will report itself as a follower node
+    /// in cluster metadata responses. This is useful for testing leader-aware
+    /// client connections and redirection logic.
+    ///
+    /// The server will return mock cluster metadata showing:
+    /// - This server as a follower node
+    /// - A hypothetical leader node on port 8090 (or configured TCP port)
+    ///
+    /// Examples:
+    ///   iggy-server                                      # Run as leader 
(default)
+    ///   iggy-server --follower                           # Run as follower
+    ///   IGGY_TCP_ADDRESS=127.0.0.1:8091 iggy-server --follower  # Follower 
on port 8091
+    #[arg(long, default_value_t = false, verbatim_doc_comment)]
+    pub follower: bool,
 }
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index becbfaf50..896eace55 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -82,6 +82,7 @@ async fn main() -> Result<(), ServerError> {
         );
     }
     let args = Args::parse();
+    let is_follower = args.follower;
 
     // FIRST DISCRETE LOADING STEP.
     // Load config and create directories.
@@ -115,6 +116,10 @@ async fn main() -> Result<(), ServerError> {
     // From this point on, we can use tracing macros to log messages.
     logging.late_init(config.system.get_system_path(), 
&config.system.logging)?;
 
+    if is_follower {
+        info!("Server is running in FOLLOWER mode for testing leader 
redirection");
+    }
+
     if args.with_default_root_credentials {
         let username_set = std::env::var("IGGY_ROOT_USERNAME").is_ok();
         let password_set = std::env::var("IGGY_ROOT_PASSWORD").is_ok();
@@ -357,6 +362,7 @@ async fn main() -> Result<(), ServerError> {
                         .encryptor(encryptor)
                         .version(current_version)
                         .metrics(metrics)
+                        .is_follower(is_follower)
                         .build();
                     let shard = Rc::new(shard);
 
diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs
index 57140e68a..8f842fecf 100644
--- a/core/server/src/quic/listener.rs
+++ b/core/server/src/quic/listener.rs
@@ -25,8 +25,7 @@ use crate::streaming::session::Session;
 use anyhow::anyhow;
 use compio_quic::{Connection, Endpoint, RecvStream, SendStream};
 use futures::FutureExt;
-use iggy_common::IggyError;
-use iggy_common::TransportProtocol;
+use iggy_common::{GET_CLUSTER_METADATA_CODE, IggyError, TransportProtocol};
 use std::rc::Rc;
 use tracing::{debug, error, info, trace};
 
@@ -198,21 +197,32 @@ async fn handle_stream(
             Ok(())
         }
         Err(e) => {
-            error!(
-                "Command was not handled successfully, session: {:?}, error: 
{e}.",
-                session
-            );
-            // Only return a connection-terminating error for client not found
-            if let IggyError::ClientNotFound(_) = e {
-                sender.send_error_response(e.clone()).await?;
-                trace!("QUIC error response was sent.");
-                error!("Session will be deleted.");
-                Err(anyhow!("Client not found: {e}"))
-            } else {
-                // For all other errors, send response and continue the 
connection
+            // Special handling for GetClusterMetadata when clustering is 
disabled
+            if code == GET_CLUSTER_METADATA_CODE && matches!(e, 
IggyError::FeatureUnavailable) {
+                debug!(
+                    "GetClusterMetadata command not available (clustering 
disabled), session: {:?}.",
+                    session
+                );
                 sender.send_error_response(e).await?;
                 trace!("QUIC error response was sent.");
                 Ok(())
+            } else {
+                error!(
+                    "Command was not handled successfully, session: {:?}, 
error: {e}.",
+                    session
+                );
+                // Only return a connection-terminating error for client not 
found
+                if let IggyError::ClientNotFound(_) = e {
+                    sender.send_error_response(e.clone()).await?;
+                    trace!("QUIC error response was sent.");
+                    error!("Session will be deleted.");
+                    Err(anyhow!("Client not found: {e}"))
+                } else {
+                    // For all other errors, send response and continue the 
connection
+                    sender.send_error_response(e).await?;
+                    trace!("QUIC error response was sent.");
+                    Ok(())
+                }
             }
         }
     }
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 37e3099ec..98914543e 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -48,6 +48,7 @@ pub struct IggyShardBuilder {
     encryptor: Option<EncryptorKind>,
     version: Option<SemanticVersion>,
     metrics: Option<Metrics>,
+    is_follower: bool,
 }
 
 impl IggyShardBuilder {
@@ -109,6 +110,11 @@ impl IggyShardBuilder {
         self
     }
 
+    pub fn is_follower(mut self, is_follower: bool) -> Self {
+        self.is_follower = is_follower;
+        self
+    }
+
     // TODO: Too much happens in there, some of those bootstrapping logic 
should be moved outside.
     pub fn build(self) -> IggyShard {
         let id = self.id.unwrap();
@@ -161,6 +167,7 @@ impl IggyShardBuilder {
             stop_receiver,
             messages_receiver: Cell::new(Some(frame_receiver)),
             metrics,
+            is_follower: self.is_follower,
             is_shutting_down: AtomicBool::new(false),
             tcp_bound_address: Cell::new(None),
             quic_bound_address: Cell::new(None),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 9d272c417..64243f51e 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -86,6 +86,7 @@ pub struct IggyShard {
     pub(crate) permissioner: RefCell<Permissioner>,
     pub(crate) users: Users,
     pub(crate) metrics: Metrics,
+    pub(crate) is_follower: bool,
     pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>,
     pub(crate) stop_receiver: StopReceiver,
     pub(crate) is_shutting_down: AtomicBool,
diff --git a/core/server/src/shard/system/cluster.rs 
b/core/server/src/shard/system/cluster.rs
index 1619bb8aa..0003a24b5 100644
--- a/core/server/src/shard/system/cluster.rs
+++ b/core/server/src/shard/system/cluster.rs
@@ -18,6 +18,7 @@
 
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
+use err_trail::ErrContext;
 use iggy_common::{
     ClusterMetadata, ClusterNode, ClusterNodeRole, ClusterNodeStatus, 
IggyError, TransportProtocol,
 };
@@ -35,27 +36,46 @@ impl IggyShard {
         // TODO(hubcio): Clustering is not yet implemented
         // The leader/follower as well as node status are currently 
placeholder implementations.
 
-        let name = self.config.cluster.name.clone();
-        let id = self.config.cluster.id;
+        let cluster_name = self.config.cluster.name.clone();
+        let cluster_id = self.config.cluster.id;
 
-        // Parse transport string to TransportProtocol enum
         let transport = 
TransportProtocol::from_str(&self.config.cluster.transport)
+            .with_error(|e| {
+                format!(
+                    "Invalid cluster transport protocol: {}, error: {}",
+                    self.config.cluster.transport, e
+                )
+            })
             .map_err(|_| IggyError::InvalidConfiguration)?;
 
+        let own_node_id = self.config.cluster.node.id;
+
         let nodes: Vec<ClusterNode> = self
             .config
             .cluster
             .nodes
             .iter()
             .map(|node_config| {
-                let role = if node_config.id == 1 {
-                    ClusterNodeRole::Leader
+                let (role, status) = if node_config.id == own_node_id {
+                    (
+                        if self.is_follower {
+                            ClusterNodeRole::Follower
+                        } else {
+                            ClusterNodeRole::Leader
+                        },
+                        ClusterNodeStatus::Healthy,
+                    )
                 } else {
-                    ClusterNodeRole::Follower
+                    (
+                        if self.is_follower {
+                            ClusterNodeRole::Leader
+                        } else {
+                            ClusterNodeRole::Follower
+                        },
+                        ClusterNodeStatus::Healthy,
+                    )
                 };
 
-                let status = ClusterNodeStatus::Healthy;
-
                 ClusterNode {
                     id: node_config.id,
                     name: node_config.name.clone(),
@@ -67,8 +87,8 @@ impl IggyShard {
             .collect();
 
         Ok(ClusterMetadata {
-            name,
-            id,
+            name: cluster_name,
+            id: cluster_id,
             transport,
             nodes,
         })
diff --git a/core/server/src/tcp/connection_handler.rs 
b/core/server/src/tcp/connection_handler.rs
index 4df84fa0d..30e142e44 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -25,7 +25,7 @@ use crate::tcp::connection_handler::command::ServerCommand;
 use async_channel::Receiver;
 use bytes::BytesMut;
 use futures::FutureExt;
-use iggy_common::IggyError;
+use iggy_common::{GET_CLUSTER_METADATA_CODE, IggyError};
 use std::io::ErrorKind;
 use std::rc::Rc;
 use tracing::{debug, error, info};
@@ -89,19 +89,30 @@ pub(crate) async fn handle_connection(
                 );
             }
             Err(error) => {
-                error!(
-                    "Command with code {cmd_code} was not handled 
successfully, session: {session}, error: {error}."
-                );
-                if let IggyError::ClientNotFound(_) = error {
+                // Special handling for GetClusterMetadata when clustering is 
disabled
+                if cmd_code == GET_CLUSTER_METADATA_CODE
+                    && matches!(error, IggyError::FeatureUnavailable)
+                {
+                    debug!(
+                        "GetClusterMetadata command not available (clustering 
disabled), session: {session}."
+                    );
                     sender.send_error_response(error).await?;
                     debug!("TCP error response was sent to: {session}.");
-                    error!("Session: {session} will be deleted.");
-                    return Err(ConnectionError::from(IggyError::ClientNotFound(
-                        session.client_id,
-                    )));
                 } else {
-                    sender.send_error_response(error).await?;
-                    debug!("TCP error response was sent to: {session}.");
+                    error!(
+                        "Command with code {cmd_code} was not handled 
successfully, session: {session}, error: {error}."
+                    );
+                    if let IggyError::ClientNotFound(_) = error {
+                        sender.send_error_response(error).await?;
+                        debug!("TCP error response was sent to: {session}.");
+                        error!("Session: {session} will be deleted.");
+                        return 
Err(ConnectionError::from(IggyError::ClientNotFound(
+                            session.client_id,
+                        )));
+                    } else {
+                        sender.send_error_response(error).await?;
+                        debug!("TCP error response was sent to: {session}.");
+                    }
                 }
             }
         }

Reply via email to