This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch cluster-metadata in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 1d83c1a2c8ff4c658b64d6d1885fc4acaaa739b3 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Nov 17 11:05:47 2025 +0100 feat(sdk): implement leader-aware reconnect mechanism in Rust SDK --- bdd/docker-compose.yml | 136 ++++++- bdd/rust/Cargo.toml | 12 +- bdd/rust/tests/common/global_context.rs | 5 - .../{global_context.rs => leader_context.rs} | 37 +- bdd/rust/tests/common/mod.rs | 1 + .../tests/{steps/mod.rs => leader_redirection.rs} | 16 +- bdd/rust/tests/steps/leader_redirection.rs | 415 +++++++++++++++++++++ bdd/rust/tests/steps/mod.rs | 1 + bdd/rust/tests/steps/server.rs | 26 +- bdd/scenarios/leader_redirection.feature | 61 +++ core/common/src/types/cluster/status.rs | 2 + core/configs/server.toml | 8 +- core/sdk/src/client_wrappers/connection_info.rs | 61 +++ core/sdk/src/client_wrappers/mod.rs | 1 + core/sdk/src/clients/binary_cluster.rs | 5 +- core/sdk/src/clients/binary_users.rs | 72 +++- core/sdk/src/clients/client.rs | 17 +- core/sdk/src/leader_aware.rs | 217 +++++++++++ core/sdk/src/lib.rs | 1 + core/sdk/src/prelude.rs | 27 +- core/sdk/src/quic/quic_client.rs | 313 ++++++++++------ core/sdk/src/tcp/tcp_client.rs | 410 +++++++++++--------- core/server/src/args.rs | 17 + core/server/src/main.rs | 6 + core/server/src/quic/listener.rs | 38 +- core/server/src/shard/builder.rs | 7 + core/server/src/shard/mod.rs | 1 + core/server/src/shard/system/cluster.rs | 40 +- core/server/src/tcp/connection_handler.rs | 33 +- 29 files changed, 1565 insertions(+), 421 deletions(-) diff --git a/bdd/docker-compose.yml b/bdd/docker-compose.yml index 86bbc9fa9..a923fc578 100644 --- a/bdd/docker-compose.yml +++ b/bdd/docker-compose.yml @@ -16,6 +16,7 @@ # under the License. services: + # Original single server for backward compatibility iggy-server: platform: linux/amd64 build: @@ -37,36 +38,159 @@ services: soft: -1 hard: -1 healthcheck: - test: ["CMD", "/usr/local/bin/iggy", "ping"] - interval: 1s + test: ["CMD", "/usr/local/bin/iggy", "--tcp-server-address", "127.0.0.1:8090", "ping"] + interval: 20s timeout: 3s retries: 30 start_period: 2s environment: - - IGGY_ROOT_USERNAME=iggy - - IGGY_ROOT_PASSWORD=iggy - RUST_LOG=info - IGGY_SYSTEM_PATH=local_data - IGGY_TCP_ADDRESS=0.0.0.0:8090 - IGGY_HTTP_ADDRESS=0.0.0.0:3000 - IGGY_QUIC_ADDRESS=0.0.0.0:8080 + - IGGY_WEBSOCKET_ADDRESS=0.0.0.0:8070 volumes: - iggy_data:/app/local_data networks: - iggy-bdd-network + # Leader server for cluster testing + iggy-leader: + platform: linux/amd64 + build: + context: .. + dockerfile: core/server/Dockerfile + target: runtime-prebuilt + args: + PREBUILT_IGGY_SERVER: ${IGGY_SERVER_PATH:-target/debug/iggy-server} + PREBUILT_IGGY_CLI: ${IGGY_CLI_PATH:-target/debug/iggy} + LIBC: glibc + PROFILE: debug + # Leader runs without --follower flag + command: ["--fresh", "--with-default-root-credentials"] + cap_add: + - SYS_NICE + security_opt: + - seccomp:unconfined + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: ["CMD", "/usr/local/bin/iggy", "--tcp-server-address", "127.0.0.1:8091", "ping"] + interval: 20s + timeout: 3s + retries: 30 + start_period: 2s + environment: + - RUST_LOG=info + - IGGY_SYSTEM_PATH=local_data_leader + # Server addresses + - IGGY_TCP_ADDRESS=0.0.0.0:8091 + - IGGY_HTTP_ADDRESS=0.0.0.0:3001 + - IGGY_QUIC_ADDRESS=0.0.0.0:8081 + - IGGY_WEBSOCKET_ADDRESS=0.0.0.0:8071 + # Cluster configuration + - IGGY_CLUSTER_ENABLED=true + - IGGY_CLUSTER_NAME=test-cluster + - IGGY_CLUSTER_ID=1 + - IGGY_CLUSTER_TRANSPORT=tcp + # This node's identity + - IGGY_CLUSTER_NODE_ID=0 + - IGGY_CLUSTER_NODE_NAME=leader-node + - IGGY_CLUSTER_NODE_ADDRESS=iggy-leader:8091 + # Cluster nodes configuration (indexed format for array) + - IGGY_CLUSTER_NODES_0_ID=0 + - IGGY_CLUSTER_NODES_0_NAME=leader-node + - IGGY_CLUSTER_NODES_0_ADDRESS=iggy-leader:8091 + - IGGY_CLUSTER_NODES_1_ID=1 + - IGGY_CLUSTER_NODES_1_NAME=follower-node + - IGGY_CLUSTER_NODES_1_ADDRESS=iggy-follower:8092 + volumes: + - iggy_leader_data:/app/local_data_leader + networks: + - iggy-bdd-network + + # Follower server for cluster testing + iggy-follower: + platform: linux/amd64 + build: + context: .. + dockerfile: core/server/Dockerfile + target: runtime-prebuilt + args: + PREBUILT_IGGY_SERVER: ${IGGY_SERVER_PATH:-target/debug/iggy-server} + PREBUILT_IGGY_CLI: ${IGGY_CLI_PATH:-target/debug/iggy} + LIBC: glibc + PROFILE: debug + # Follower runs with --follower flag + command: ["--fresh", "--with-default-root-credentials", "--follower"] + cap_add: + - SYS_NICE + security_opt: + - seccomp:unconfined + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: ["CMD", "/usr/local/bin/iggy", "--tcp-server-address", "127.0.0.1:8092", "ping"] + interval: 20s + timeout: 3s + retries: 30 + start_period: 2s + environment: + - RUST_LOG=info + - IGGY_SYSTEM_PATH=local_data_follower + # Server addresses (different ports from leader) + - IGGY_TCP_ADDRESS=0.0.0.0:8092 + - IGGY_HTTP_ADDRESS=0.0.0.0:3002 + - IGGY_QUIC_ADDRESS=0.0.0.0:8082 + - IGGY_WEBSOCKET_ADDRESS=0.0.0.0:8072 + # Cluster configuration (same as leader) + - IGGY_CLUSTER_ENABLED=true + - IGGY_CLUSTER_NAME=test-cluster + - IGGY_CLUSTER_ID=1 + - IGGY_CLUSTER_TRANSPORT=tcp + # This node's identity (different from leader) + - IGGY_CLUSTER_NODE_ID=1 + - IGGY_CLUSTER_NODE_NAME=follower-node + - IGGY_CLUSTER_NODE_ADDRESS=iggy-follower:8092 + # Cluster nodes configuration (indexed format for array) + - IGGY_CLUSTER_NODES_0_ID=0 + - IGGY_CLUSTER_NODES_0_NAME=leader-node + - IGGY_CLUSTER_NODES_0_ADDRESS=iggy-leader:8091 + - IGGY_CLUSTER_NODES_1_ID=1 + - IGGY_CLUSTER_NODES_1_NAME=follower-node + - IGGY_CLUSTER_NODES_1_ADDRESS=iggy-follower:8092 + volumes: + - iggy_follower_data:/app/local_data_follower + networks: + - iggy-bdd-network + rust-bdd: build: context: .. dockerfile: bdd/rust/Dockerfile depends_on: - - iggy-server + iggy-server: + condition: service_healthy + iggy-leader: + condition: service_healthy + iggy-follower: + condition: service_healthy environment: - IGGY_ROOT_USERNAME=iggy - IGGY_ROOT_PASSWORD=iggy - IGGY_TCP_ADDRESS=iggy-server:8090 + # Additional addresses for leader redirection tests + - IGGY_TCP_ADDRESS_LEADER=iggy-leader:8091 + - IGGY_TCP_ADDRESS_FOLLOWER=iggy-follower:8092 + - RUST_LOG=debug volumes: - ./scenarios/basic_messaging.feature:/app/features/basic_messaging.feature + - ./scenarios/leader_redirection.feature:/app/features/leader_redirection.feature command: [ "cargo", @@ -149,3 +273,5 @@ networks: volumes: iggy_data: + iggy_leader_data: + iggy_follower_data: diff --git a/bdd/rust/Cargo.toml b/bdd/rust/Cargo.toml index 18f2bf88e..ab8b019c0 100644 --- a/bdd/rust/Cargo.toml +++ b/bdd/rust/Cargo.toml @@ -41,11 +41,7 @@ name = "basic_messaging" harness = false required-features = ["bdd"] -# Future test scenarios can be added here: -# [[test]] -# name = "user_management" -# harness = false -# -# [[test]] -# name = "consumer_groups" -# harness = false +[[test]] +name = "leader_redirection" +harness = false +required-features = ["bdd"] diff --git a/bdd/rust/tests/common/global_context.rs b/bdd/rust/tests/common/global_context.rs index a526f873e..a425c6776 100644 --- a/bdd/rust/tests/common/global_context.rs +++ b/bdd/rust/tests/common/global_context.rs @@ -20,13 +20,8 @@ use cucumber::World; use iggy::clients::client::IggyClient; use iggy::prelude::{IggyMessage, PolledMessages}; -#[cfg(not(feature = "iggy-server-in-docker"))] -use integration::test_server::TestServer; - #[derive(Debug, World, Default)] pub struct GlobalContext { - #[cfg(not(feature = "iggy-server-in-docker"))] - pub server: Option<TestServer>, pub client: Option<IggyClient>, pub server_addr: Option<String>, pub last_stream_id: Option<u32>, diff --git a/bdd/rust/tests/common/global_context.rs b/bdd/rust/tests/common/leader_context.rs similarity index 58% copy from bdd/rust/tests/common/global_context.rs copy to bdd/rust/tests/common/leader_context.rs index a526f873e..913866f11 100644 --- a/bdd/rust/tests/common/global_context.rs +++ b/bdd/rust/tests/common/leader_context.rs @@ -18,22 +18,31 @@ use cucumber::World; use iggy::clients::client::IggyClient; -use iggy::prelude::{IggyMessage, PolledMessages}; - -#[cfg(not(feature = "iggy-server-in-docker"))] -use integration::test_server::TestServer; +use iggy::prelude::ClusterNode; +use std::collections::HashMap; #[derive(Debug, World, Default)] -pub struct GlobalContext { - #[cfg(not(feature = "iggy-server-in-docker"))] - pub server: Option<TestServer>, - pub client: Option<IggyClient>, - pub server_addr: Option<String>, +pub struct LeaderContext { + // Server addresses by name (e.g., "leader" -> "iggy-leader:8091") + pub server_addrs: HashMap<String, String>, + + // Multiple clients for testing + pub clients: HashMap<String, IggyClient>, + + // Track which client connected to which server + pub client_connections: HashMap<String, String>, + + // Track if redirection happened + pub redirection_occurred: bool, + + // Cluster configuration + pub cluster_enabled: bool, + pub node_count: usize, + + // Node configurations + pub nodes: Vec<ClusterNode>, + + // Fields for compatibility with existing step definitions pub last_stream_id: Option<u32>, pub last_stream_name: Option<String>, - pub last_topic_id: Option<u32>, - pub last_topic_name: Option<String>, - pub last_topic_partitions: Option<u32>, - pub last_polled_messages: Option<PolledMessages>, - pub last_sent_message: Option<IggyMessage>, } diff --git a/bdd/rust/tests/common/mod.rs b/bdd/rust/tests/common/mod.rs index 0c28f3a53..16682d2a9 100644 --- a/bdd/rust/tests/common/mod.rs +++ b/bdd/rust/tests/common/mod.rs @@ -17,3 +17,4 @@ */ pub mod global_context; +pub mod leader_context; diff --git a/bdd/rust/tests/steps/mod.rs b/bdd/rust/tests/leader_redirection.rs similarity index 75% copy from bdd/rust/tests/steps/mod.rs copy to bdd/rust/tests/leader_redirection.rs index e9d00b626..08598565e 100644 --- a/bdd/rust/tests/steps/mod.rs +++ b/bdd/rust/tests/leader_redirection.rs @@ -16,8 +16,14 @@ * under the License. */ -pub mod auth; -pub mod messages; -pub mod server; -pub mod streams; -pub mod topics; +pub(crate) mod common; +pub(crate) mod helpers; +pub(crate) mod steps; + +use crate::common::leader_context::LeaderContext; +use cucumber::World; + +#[tokio::main] +async fn main() { + LeaderContext::run("../../bdd/scenarios/leader_redirection.feature").await; +} diff --git a/bdd/rust/tests/steps/leader_redirection.rs b/bdd/rust/tests/steps/leader_redirection.rs new file mode 100644 index 000000000..0f7f7d72d --- /dev/null +++ b/bdd/rust/tests/steps/leader_redirection.rs @@ -0,0 +1,415 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::common::leader_context::LeaderContext; +use cucumber::{given, then, when}; +use iggy::prelude::*; +use integration::tcp_client::TcpClientFactory; +use integration::test_server::{ClientFactory, login_root}; +use std::time::Duration; +use tokio::time::sleep; + +// Background steps for cluster configuration +#[given(regex = r"^I have cluster configuration enabled with (\d+) nodes$")] +async fn given_cluster_config(world: &mut LeaderContext, node_count: usize) { + world.cluster_enabled = true; + world.node_count = node_count; + world.nodes = Vec::with_capacity(node_count); +} + +#[given(regex = r"^node (\d+) is configured on port (\d+)$")] +async fn given_node_configured(world: &mut LeaderContext, node_id: u32, port: u16) { + let node = ClusterNode { + id: node_id, + name: format!("node-{}", node_id), + address: format!("iggy-server:{}", port), + role: ClusterNodeRole::Follower, + status: ClusterNodeStatus::Healthy, + }; + world.nodes.push(node); +} + +// Leader/follower server setup - in Docker mode, servers are already running +#[given(regex = r"^I start server (\d+) on port (\d+) as leader$")] +async fn given_start_leader_server(world: &mut LeaderContext, node_id: u32, port: u16) { + // In Docker mode, determine the address based on port + let addr = match port { + 8091 => std::env::var("IGGY_TCP_ADDRESS_LEADER") + .unwrap_or_else(|_| "iggy-leader:8091".to_string()), + _ => format!("iggy-server:{}", port), + }; + world.server_addrs.insert("leader".to_string(), addr); + + if let Some(node) = world + .nodes + .iter_mut() + .find(|n| n.id == node_id && n.address.ends_with(&format!(":{}", port))) + { + node.role = ClusterNodeRole::Leader; + } +} + +#[given(regex = r"^I start server (\d+) on port (\d+) as follower$")] +async fn given_start_follower_server(world: &mut LeaderContext, node_id: u32, port: u16) { + // In Docker mode, determine the address based on port + let addr = match port { + 8092 => std::env::var("IGGY_TCP_ADDRESS_FOLLOWER") + .unwrap_or_else(|_| "iggy-follower:8092".to_string()), + _ => format!("iggy-server:{}", port), + }; + world.server_addrs.insert("follower".to_string(), addr); + + if let Some(node) = world + .nodes + .iter_mut() + .find(|n| n.id == node_id && n.address.ends_with(&format!(":{}", port))) + { + node.role = ClusterNodeRole::Follower; + } +} + +#[given(regex = r"^I start a single server on port (\d+) without clustering enabled$")] +async fn given_single_server_no_cluster(world: &mut LeaderContext, port: u16) { + // In Docker mode, use the main iggy-server + let addr = match port { + 8090 => { + std::env::var("IGGY_TCP_ADDRESS").unwrap_or_else(|_| "iggy-server:8090".to_string()) + } + _ => format!("iggy-server:{}", port), + }; + world.server_addrs.insert("single".to_string(), addr); + world.cluster_enabled = false; +} + +// Client connection steps +#[when(regex = r"^I create a client connecting to follower on port (\d+)$")] +async fn when_connect_to_follower(world: &mut LeaderContext, port: u16) { + // Find the node with this port that is marked as follower + let node = world.nodes.iter().find(|n| { + n.address.ends_with(&format!(":{}", port)) && matches!(n.role, ClusterNodeRole::Follower) + }); + assert!( + node.is_some(), + "Follower node on port {} should be configured", + port + ); + + let addr = world + .server_addrs + .get("follower") + .expect("Follower server should be configured") + .clone(); + create_client_for_addr(world, "main", &addr).await; +} + +#[when(regex = r"^I create a client connecting directly to leader on port (\d+)$")] +async fn when_connect_to_leader(world: &mut LeaderContext, port: u16) { + // Find the node with this port that is marked as leader + let node = world.nodes.iter().find(|n| { + n.address.ends_with(&format!(":{}", port)) && matches!(n.role, ClusterNodeRole::Leader) + }); + assert!( + node.is_some(), + "Leader node on port {} should be configured", + port + ); + + let addr = world + .server_addrs + .get("leader") + .expect("Leader server should be configured") + .clone(); + create_client_for_addr(world, "main", &addr).await; + world.redirection_occurred = false; +} + +#[when(regex = r"^I create a client connecting to port (\d+)$")] +async fn when_connect_to_port(world: &mut LeaderContext, port: u16) { + assert_eq!(port, 8090, "Single server should be on port 8090"); + let addr = world + .server_addrs + .get("single") + .expect("Single server should be configured") + .clone(); + create_client_for_addr(world, "main", &addr).await; +} + +#[when(regex = r"^I create client ([A-Z]) connecting to port (\d+)$")] +async fn when_create_named_client(world: &mut LeaderContext, client_name: String, port: u16) { + // Determine which server based on port + let addr = if port == 8091 { + world.server_addrs.get("leader").cloned() + } else { + world.server_addrs.get("follower").cloned() + } + .expect("Server should be configured"); + create_client_for_addr(world, &client_name, &addr).await; +} + +// Authentication steps +#[when("I authenticate as root user")] +async fn when_authenticate_root(world: &mut LeaderContext) { + let client = world.clients.get("main").expect("Client should be created"); + + login_root(client).await; +} + +#[when("both clients authenticate as root user")] +async fn when_both_clients_authenticate(world: &mut LeaderContext) { + for (_, client) in world.clients.iter() { + login_root(client).await; + sleep(Duration::from_millis(100)).await; + } +} + +// Stream creation step for leader context +#[when(regex = r#"^I create a stream named "(.+)"$"#)] +async fn when_create_stream_in_cluster(world: &mut LeaderContext, stream_name: String) { + let client = world + .clients + .get("main") + .expect("Client should be available"); + let stream = client + .create_stream(&stream_name) + .await + .expect("Should be able to create stream"); + + world.last_stream_id = Some(stream.id); + world.last_stream_name = Some(stream.name.clone()); +} + +#[then("the stream should be created successfully on the leader")] +async fn then_stream_created_on_leader(world: &mut LeaderContext) { + assert!( + world.last_stream_id.is_some(), + "Stream should have been created on leader" + ); +} + +// Verification steps +#[then(regex = r"^the client should automatically redirect to leader on port (\d+)$")] +async fn then_client_redirected(world: &mut LeaderContext, leader_port: u16) { + let client = world.clients.get("main").expect("Client should exist"); + + // Get the actual connection info to verify we're connected to the leader + let connection_info = client.get_connection_info().await; + + // Check if the connection address matches the leader port + assert!( + connection_info + .server_address + .contains(&format!(":{}", leader_port)), + "Client should be connected to leader on port {}, but is connected to: {}", + leader_port, + connection_info.server_address + ); + + // Also verify through cluster metadata + if let Ok(metadata) = client.get_cluster_metadata().await { + let leader_node = metadata + .nodes + .iter() + .find(|n| matches!(n.role, ClusterNodeRole::Leader)); + + assert!( + leader_node.is_some(), + "Leader node should be found in cluster metadata" + ); + } + + world.redirection_occurred = true; +} + +#[then("the client should not perform any redirection")] +async fn then_no_redirection(world: &mut LeaderContext) { + assert!( + !world.redirection_occurred, + "No redirection should occur when connecting directly to leader" + ); +} + +#[then(regex = r"^the connection should remain on port (\d+)$")] +async fn then_connection_remains(world: &mut LeaderContext, port: u16) { + let client = world.clients.get("main").expect("Client should exist"); + + // Get the actual connection info + let connection_info = client.get_connection_info().await; + + // Verify the client is still connected to the original port + assert!( + connection_info + .server_address + .contains(&format!(":{}", port)), + "Connection should remain on port {}, but is connected to: {}", + port, + connection_info.server_address + ); + + // Verify no redirection occurred + assert!( + !world.redirection_occurred, + "Connection should not have been redirected" + ); +} + +#[then("the client should connect successfully without redirection")] +async fn then_connect_without_redirection(world: &mut LeaderContext) { + let client = world.clients.get("main").expect("Client should exist"); + + // Verify client is connected and can ping + let ping_result = client.ping().await; + assert!(ping_result.is_ok(), "Client should be able to ping server"); + assert!( + !world.redirection_occurred, + "No redirection should occur without clustering" + ); +} + +#[then(regex = r"^client ([A-Z]) should stay connected to port (\d+)$")] +async fn then_client_stays_connected(world: &mut LeaderContext, client_name: String, port: u16) { + let client = world + .clients + .get(&client_name) + .expect("Client should exist"); + + // Get the actual connection info + let connection_info = client.get_connection_info().await; + + // Verify the client is still connected to the expected port + assert!( + connection_info + .server_address + .contains(&format!(":{}", port)), + "Client {} should stay connected to port {}, but is connected to: {}", + client_name, + port, + connection_info.server_address + ); + + let ping_result = client.ping().await; + assert!( + ping_result.is_ok(), + "Client {} should be able to ping", + client_name + ); +} + +#[then(regex = r"^client ([A-Z]) should redirect to port (\d+)$")] +async fn then_client_redirects(world: &mut LeaderContext, client_name: String, leader_port: u16) { + let client = world + .clients + .get(&client_name) + .expect("Client should exist"); + + // Get the actual connection info to verify redirection + let connection_info = client.get_connection_info().await; + + // Verify the client is now connected to the leader port + assert!( + connection_info + .server_address + .contains(&format!(":{}", leader_port)), + "Client {} should redirect to port {}, but is connected to: {}", + client_name, + leader_port, + connection_info.server_address + ); + + // After authentication, the SDK should have redirected to leader + if let Ok(metadata) = client.get_cluster_metadata().await { + let leader = metadata + .nodes + .iter() + .find(|n| matches!(n.role, ClusterNodeRole::Leader)); + + assert!( + leader.is_some(), + "Client {} should find leader in cluster metadata", + client_name + ); + } +} + +#[then("both clients should be using the same server")] +async fn then_both_use_same_server(world: &mut LeaderContext) { + let client_a = world.clients.get("A").expect("Client A should exist"); + let client_b = world.clients.get("B").expect("Client B should exist"); + + // Get connection info for both clients + let conn_info_a = client_a.get_connection_info().await; + let conn_info_b = client_b.get_connection_info().await; + + // Verify both clients are connected to the same server + assert_eq!( + conn_info_a.server_address, conn_info_b.server_address, + "Both clients should be connected to the same server. Client A: {}, Client B: {}", + conn_info_a.server_address, conn_info_b.server_address + ); + + // Both clients should be able to ping the server + assert!( + client_a.ping().await.is_ok(), + "Client A should be able to ping" + ); + assert!( + client_b.ping().await.is_ok(), + "Client B should be able to ping" + ); + + // If cluster metadata is available, verify both see the same leader + if let (Ok(metadata_a), Ok(metadata_b)) = ( + client_a.get_cluster_metadata().await, + client_b.get_cluster_metadata().await, + ) { + let leader_a = metadata_a + .nodes + .iter() + .find(|n| matches!(n.role, ClusterNodeRole::Leader)); + + let leader_b = metadata_b + .nodes + .iter() + .find(|n| matches!(n.role, ClusterNodeRole::Leader)); + + if let (Some(leader_a), Some(leader_b)) = (leader_a, leader_b) { + assert_eq!( + leader_a.address, leader_b.address, + "Both clients should see the same leader" + ); + } + } +} + +// Helper function to create a client for a given address +async fn create_client_for_addr(world: &mut LeaderContext, client_name: &str, addr: &str) { + let client_factory = TcpClientFactory { + server_addr: addr.to_string(), + ..Default::default() + }; + + let client = client_factory.create_client().await; + let client = IggyClient::create(client, None, None); + + // Connect the client before authentication + client.connect().await.expect("Client should connect"); + + world + .client_connections + .insert(client_name.to_string(), addr.to_string()); + world.clients.insert(client_name.to_string(), client); +} diff --git a/bdd/rust/tests/steps/mod.rs b/bdd/rust/tests/steps/mod.rs index e9d00b626..45c8bde63 100644 --- a/bdd/rust/tests/steps/mod.rs +++ b/bdd/rust/tests/steps/mod.rs @@ -17,6 +17,7 @@ */ pub mod auth; +pub mod leader_redirection; pub mod messages; pub mod server; pub mod streams; diff --git a/bdd/rust/tests/steps/server.rs b/bdd/rust/tests/steps/server.rs index ad688f73b..a4027875e 100644 --- a/bdd/rust/tests/steps/server.rs +++ b/bdd/rust/tests/steps/server.rs @@ -19,28 +19,10 @@ use crate::common::global_context::GlobalContext; use cucumber::given; -#[cfg(not(feature = "iggy-server-in-docker"))] -use integration::test_server::TestServer; - #[given("I have a running Iggy server")] pub async fn given_running_server(world: &mut GlobalContext) { - #[cfg(feature = "iggy-server-in-docker")] - { - // External server mode - connect to server from environment - let server_addr = - std::env::var("IGGY_TCP_ADDRESS").unwrap_or_else(|_| "localhost:8090".to_string()); - world.server_addr = Some(server_addr); - // No TestServer instance in external mode - } - - #[cfg(not(feature = "iggy-server-in-docker"))] - { - // Embedded server mode - start our own TestServer - let mut test_server = TestServer::default(); - test_server.start(); - let server_addr = test_server.get_raw_tcp_addr().unwrap(); - - world.server_addr = Some(server_addr); - world.server = Some(test_server); - } + // External server mode - connect to server from environment + let server_addr = + std::env::var("IGGY_TCP_ADDRESS").unwrap_or_else(|_| "localhost:8090".to_string()); + world.server_addr = Some(server_addr); } diff --git a/bdd/scenarios/leader_redirection.feature b/bdd/scenarios/leader_redirection.feature new file mode 100644 index 000000000..0f6c76701 --- /dev/null +++ b/bdd/scenarios/leader_redirection.feature @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +Feature: Leader-Aware Client Connections + As a developer using Apache Iggy with clustering + I want my SDK clients to automatically connect to the leader node + So that I can always interact with the correct server instance + + Background: + Given I have cluster configuration enabled with 2 nodes + And node 0 is configured on port 8091 + And node 1 is configured on port 8092 + + Scenario: Client redirects from follower to leader + Given I start server 0 on port 8091 as leader + And I start server 1 on port 8092 as follower + When I create a client connecting to follower on port 8092 + And I authenticate as root user + Then the client should automatically redirect to leader on port 8091 + When I create a stream named "test-stream" + Then the stream should be created successfully on the leader + + Scenario: Client connects directly to leader without redirection + Given I start server 0 on port 8091 as leader + And I start server 1 on port 8092 as follower + When I create a client connecting directly to leader on port 8091 + And I authenticate as root user + Then the client should not perform any redirection + And the connection should remain on port 8091 + + Scenario: Client handles missing cluster metadata gracefully + Given I start a single server on port 8090 without clustering enabled + When I create a client connecting to port 8090 + And I authenticate as root user + Then the client should connect successfully without redirection + When I create a stream named "single-server-stream" + Then the stream should be created successfully on the leader + + Scenario: Multiple clients converge to the same leader + Given I start server 0 on port 8091 as leader + And I start server 1 on port 8092 as follower + When I create client A connecting to port 8091 + And I create client B connecting to port 8092 + And both clients authenticate as root user + Then client A should stay connected to port 8091 + And client B should redirect to port 8091 + And both clients should be using the same server \ No newline at end of file diff --git a/core/common/src/types/cluster/status.rs b/core/common/src/types/cluster/status.rs index e60932f1a..ddb8c07d3 100644 --- a/core/common/src/types/cluster/status.rs +++ b/core/common/src/types/cluster/status.rs @@ -38,6 +38,8 @@ pub enum ClusterNodeStatus { Unreachable, /// Node is in maintenance mode Maintenance, + /// Node is unknown + Unknown, } impl TryFrom<u8> for ClusterNodeStatus { diff --git a/core/configs/server.toml b/core/configs/server.toml index db689b3ea..42f01d555 100644 --- a/core/configs/server.toml +++ b/core/configs/server.toml @@ -511,7 +511,7 @@ enabled = false # Unique cluster identifier (u32). # All nodes in the same cluster must share the same cluster id. -id = 1 +id = 0 # Unique cluster name (string). # All nodes in the same cluster must share the same name. @@ -526,16 +526,16 @@ transport = "tcp" [cluster.node] # This node unique identifier within the cluster (u32). # Must be unique across all nodes and match one of the ids in the nodes list. -id = 1 +id = 0 # All nodes in the cluster. [[cluster.nodes]] -id = 1 +id = 0 name = "iggy-node-1" address = "127.0.0.1:8090" [[cluster.nodes]] -id = 2 +id = 1 name = "iggy-node-2" address = "127.0.0.1:8091" diff --git a/core/sdk/src/client_wrappers/connection_info.rs b/core/sdk/src/client_wrappers/connection_info.rs new file mode 100644 index 000000000..2d6d52d4a --- /dev/null +++ b/core/sdk/src/client_wrappers/connection_info.rs @@ -0,0 +1,61 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::client_wrappers::client_wrapper::ClientWrapper; +use iggy_common::TransportProtocol; + +/// Connection information for the current client connection +#[derive(Debug, Clone)] +pub struct ConnectionInfo { + /// The transport protocol being used + pub protocol: TransportProtocol, + /// The current server address the client is connected to + pub server_address: String, +} + +impl ClientWrapper { + /// Returns the current connection information including protocol and server address + pub async fn get_connection_info(&self) -> ConnectionInfo { + match self { + ClientWrapper::Iggy(_) => { + // This variant should not be used in practice as IggyClient always wraps + // one of the concrete transport clients. Return a default/placeholder. + ConnectionInfo { + protocol: TransportProtocol::Tcp, + server_address: String::from("unknown"), + } + } + ClientWrapper::Tcp(client) => ConnectionInfo { + protocol: TransportProtocol::Tcp, + server_address: client.current_server_address.lock().await.clone(), + }, + ClientWrapper::Quic(client) => ConnectionInfo { + protocol: TransportProtocol::Quic, + server_address: client.current_server_address.lock().await.to_string(), + }, + ClientWrapper::Http(client) => ConnectionInfo { + protocol: TransportProtocol::Http, + server_address: client.api_url.to_string(), + }, + ClientWrapper::WebSocket(client) => ConnectionInfo { + protocol: TransportProtocol::WebSocket, + server_address: client.config.server_address.clone(), + }, + } + } +} diff --git a/core/sdk/src/client_wrappers/mod.rs b/core/sdk/src/client_wrappers/mod.rs index bd093e01b..2cf980606 100644 --- a/core/sdk/src/client_wrappers/mod.rs +++ b/core/sdk/src/client_wrappers/mod.rs @@ -29,3 +29,4 @@ mod binary_system_client; mod binary_topic_client; mod binary_user_client; pub mod client_wrapper; +pub mod connection_info; diff --git a/core/sdk/src/clients/binary_cluster.rs b/core/sdk/src/clients/binary_cluster.rs index 18ffc6f9b..6ab7ead4a 100644 --- a/core/sdk/src/clients/binary_cluster.rs +++ b/core/sdk/src/clients/binary_cluster.rs @@ -19,14 +19,11 @@ use crate::prelude::IggyClient; use async_trait::async_trait; use iggy_binary_protocol::ClusterClient; -use iggy_common::{ClusterMetadata, IggyError}; +use iggy_common::{ClusterMetadata, IggyError, locking::IggyRwLockFn}; #[async_trait] impl ClusterClient for IggyClient { async fn get_cluster_metadata(&self) -> Result<ClusterMetadata, IggyError> { - todo!(); - /* self.client.read().await.get_cluster_metadata().await - */ } } diff --git a/core/sdk/src/clients/binary_users.rs b/core/sdk/src/clients/binary_users.rs index c13ab9288..9390a6021 100644 --- a/core/sdk/src/clients/binary_users.rs +++ b/core/sdk/src/clients/binary_users.rs @@ -16,13 +16,16 @@ * under the License. */ +use crate::client_wrappers::client_wrapper::ClientWrapper; +use crate::leader_aware::check_and_redirect_to_leader; use crate::prelude::IggyClient; use async_trait::async_trait; -use iggy_binary_protocol::UserClient; +use iggy_binary_protocol::{Client, UserClient}; use iggy_common::locking::IggyRwLockFn; use iggy_common::{ Identifier, IdentityInfo, IggyError, Permissions, UserInfo, UserInfoDetails, UserStatus, }; +use tracing::info; #[async_trait] impl UserClient for IggyClient { @@ -91,11 +94,74 @@ impl UserClient for IggyClient { } async fn login_user(&self, username: &str, password: &str) -> Result<IdentityInfo, IggyError> { - self.client + // Perform the initial login + let identity = self + .client .read() .await .login_user(username, password) - .await + .await?; + + // Check if we need to redirect to leader (only for TCP and QUIC clients) + let new_leader_addr = { + let client = self.client.read().await; + match &*client { + ClientWrapper::Tcp(tcp_client) => { + let current_addr = tcp_client.current_server_address.lock().await.clone(); + check_and_redirect_to_leader(tcp_client, ¤t_addr).await? + } + ClientWrapper::Quic(quic_client) => { + let current_addr = quic_client.current_server_address.lock().await.to_string(); + check_and_redirect_to_leader(quic_client, ¤t_addr).await? + } + _ => None, // HTTP and WebSocket don't support leader redirection + } + }; + + // If redirection is needed, disconnect, update address, and reconnect + if let Some(leader_addr) = new_leader_addr { + info!( + "Redirecting to leader at {} after manual login", + leader_addr + ); + + // Disconnect from current server + self.client.read().await.disconnect().await?; + + // Update the server address and reconnect + { + let client = self.client.read().await; + match &*client { + ClientWrapper::Tcp(tcp_client) => { + *tcp_client.current_server_address.lock().await = leader_addr.clone(); + } + ClientWrapper::Quic(quic_client) => { + use std::net::SocketAddr; + use std::str::FromStr; + + if let Ok(socket_addr) = SocketAddr::from_str(&leader_addr) { + *quic_client.current_server_address.lock().await = socket_addr; + } + } + _ => {} + } + } + + // Reconnect to the leader + self.client.read().await.connect().await?; + + // Re-authenticate with the leader + let identity = self + .client + .read() + .await + .login_user(username, password) + .await?; + + Ok(identity) + } else { + Ok(identity) + } } async fn logout_user(&self) -> Result<(), IggyError> { diff --git a/core/sdk/src/clients/client.rs b/core/sdk/src/clients/client.rs index 15b39db2b..b2dd19689 100644 --- a/core/sdk/src/clients/client.rs +++ b/core/sdk/src/clients/client.rs @@ -16,11 +16,9 @@ * under the License. */ -use crate::clients::client_builder::IggyClientBuilder; -use iggy_common::Consumer; -use iggy_common::locking::{IggyRwLock, IggyRwLockFn}; - use crate::client_wrappers::client_wrapper::ClientWrapper; +use crate::client_wrappers::connection_info::ConnectionInfo; +use crate::clients::client_builder::IggyClientBuilder; use crate::http::http_client::HttpClient; use crate::prelude::EncryptorKind; use crate::prelude::IggyConsumerBuilder; @@ -32,6 +30,8 @@ use crate::websocket::websocket_client::WebSocketClient; use async_broadcast::Receiver; use async_trait::async_trait; use iggy_binary_protocol::{Client, SystemClient}; +use iggy_common::Consumer; +use iggy_common::locking::{IggyRwLock, IggyRwLockFn}; use iggy_common::{ConnectionStringUtils, DiagnosticEvent, Partitioner, TransportProtocol}; use std::fmt::Debug; use std::sync::Arc; @@ -63,7 +63,6 @@ impl IggyClient { IggyClientBuilder::new() } - /// Creates a new `IggyClientBuilder` from the provided connection string. /// Creates a new `IggyClientBuilder` from the provided connection string. pub fn builder_from_connection_string( connection_string: &str, @@ -81,7 +80,6 @@ impl IggyClient { } } - /// Creates a new `IggyClient` from the provided connection string. /// Creates a new `IggyClient` from the provided connection string. pub fn from_connection_string(connection_string: &str) -> Result<Self, IggyError> { match ConnectionStringUtils::parse_protocol(connection_string)? { @@ -177,6 +175,13 @@ impl IggyClient { None, )) } + + /// Returns the current connection information including the transport protocol and server address. + /// This is useful for verifying which server the client is connected to, especially after + /// leader redirection in a clustered environment. + pub async fn get_connection_info(&self) -> ConnectionInfo { + self.client.read().await.get_connection_info().await + } } #[async_trait] diff --git a/core/sdk/src/leader_aware.rs b/core/sdk/src/leader_aware.rs new file mode 100644 index 000000000..0a17b33b9 --- /dev/null +++ b/core/sdk/src/leader_aware.rs @@ -0,0 +1,217 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy_binary_protocol::ClusterClient; +use iggy_common::{ + ClusterMetadata, ClusterNodeRole, ClusterNodeStatus, IggyError, IggyErrorDiscriminants, +}; +use std::net::SocketAddr; +use std::str::FromStr; +use tracing::{debug, info, warn}; + +/// Maximum number of leader redirections to prevent infinite loops +const MAX_LEADER_REDIRECTS: u8 = 3; + +/// Check if we need to redirect to leader and return the leader address if redirection is needed +pub async fn check_and_redirect_to_leader<C: ClusterClient>( + client: &C, + current_address: &str, +) -> Result<Option<String>, IggyError> { + debug!("Checking cluster metadata for leader detection"); + + // Try to get cluster metadata + match client.get_cluster_metadata().await { + Ok(metadata) => { + debug!( + "Got cluster metadata: {} nodes, cluster: {}", + metadata.nodes.len(), + metadata.name + ); + process_cluster_metadata(&metadata, current_address) + } + Err(e) if is_feature_unavailable_error(&e) => { + debug!("Cluster metadata feature unavailable - server doesn't support clustering"); + Ok(None) + } + Err(e) => { + warn!("Failed to get cluster metadata: {}", e); + // Don't fail the connection, just continue with current node + Ok(None) + } + } +} + +/// Process cluster metadata and determine if redirection is needed +fn process_cluster_metadata( + metadata: &ClusterMetadata, + current_address: &str, +) -> Result<Option<String>, IggyError> { + // Find active leader node + let leader = metadata + .nodes + .iter() + .find(|n| n.role == ClusterNodeRole::Leader && n.status == ClusterNodeStatus::Healthy); + + match leader { + Some(leader_node) => { + info!( + "Found leader node: {} at {}", + leader_node.name, leader_node.address + ); + + // Check if we're already connected to the leader + if !is_same_address(current_address, &leader_node.address) { + info!( + "Current connection to {} is not the leader, will redirect to {}", + current_address, leader_node.address + ); + Ok(Some(leader_node.address.clone())) + } else { + debug!("Already connected to leader at {}", current_address); + Ok(None) + } + } + None => { + warn!("No active leader found in cluster metadata"); + // Continue with current connection if no leader is found + Ok(None) + } + } +} + +/// Check if two addresses refer to the same endpoint +/// Handles various formats like 127.0.0.1:8090 vs localhost:8090 +fn is_same_address(addr1: &str, addr2: &str) -> bool { + // Try to parse as socket addresses for comparison + match (parse_address(addr1), parse_address(addr2)) { + (Some(sock1), Some(sock2)) => { + // Compare IP and port + sock1.ip() == sock2.ip() && sock1.port() == sock2.port() + } + _ => { + // Fallback to string comparison if parsing fails + normalize_address(addr1) == normalize_address(addr2) + } + } +} + +/// Parse address string to SocketAddr, handling various formats +fn parse_address(addr: &str) -> Option<SocketAddr> { + // Try direct parsing + if let Ok(socket_addr) = SocketAddr::from_str(addr) { + return Some(socket_addr); + } + + // Try with common replacements + let normalized = addr + .replace("localhost", "127.0.0.1") + .replace("[::]", "[::1]"); + + SocketAddr::from_str(&normalized).ok() +} + +/// Normalize address string for comparison +fn normalize_address(addr: &str) -> String { + addr.to_lowercase() + .replace("localhost", "127.0.0.1") + .replace("[::]", "[::1]") +} + +/// Check if the error indicates that the feature is unavailable +fn is_feature_unavailable_error(error: &IggyError) -> bool { + matches!( + error, + IggyError::FeatureUnavailable | IggyError::InvalidCommand + ) || error.as_code() == IggyErrorDiscriminants::FeatureUnavailable as u32 +} + +/// Struct to track leader redirection state +#[derive(Debug, Clone)] +pub struct LeaderRedirectionState { + pub redirect_count: u8, + pub last_leader_address: Option<String>, +} + +impl LeaderRedirectionState { + pub fn new() -> Self { + Self { + redirect_count: 0, + last_leader_address: None, + } + } + + pub fn can_redirect(&self) -> bool { + self.redirect_count < MAX_LEADER_REDIRECTS + } + + pub fn increment_redirect(&mut self, leader_address: String) { + self.redirect_count += 1; + self.last_leader_address = Some(leader_address); + } + + pub fn reset(&mut self) { + self.redirect_count = 0; + self.last_leader_address = None; + } +} + +impl Default for LeaderRedirectionState { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_same_address() { + assert!(is_same_address("127.0.0.1:8090", "127.0.0.1:8090")); + assert!(is_same_address("localhost:8090", "127.0.0.1:8090")); + assert!(!is_same_address("127.0.0.1:8090", "127.0.0.1:8091")); + assert!(!is_same_address("192.168.1.1:8090", "127.0.0.1:8090")); + } + + #[test] + fn test_normalize_address() { + assert_eq!(normalize_address("localhost:8090"), "127.0.0.1:8090"); + assert_eq!(normalize_address("LOCALHOST:8090"), "127.0.0.1:8090"); + assert_eq!(normalize_address("[::]:8090"), "[::1]:8090"); + } + + #[test] + fn test_leader_redirection_state() { + let mut state = LeaderRedirectionState::new(); + assert!(state.can_redirect()); + assert_eq!(state.redirect_count, 0); + + state.increment_redirect("127.0.0.1:8090".to_string()); + assert!(state.can_redirect()); + assert_eq!(state.redirect_count, 1); + + state.increment_redirect("127.0.0.1:8091".to_string()); + state.increment_redirect("127.0.0.1:8092".to_string()); + assert!(!state.can_redirect()); + assert_eq!(state.redirect_count, 3); + + state.reset(); + assert!(state.can_redirect()); + assert_eq!(state.redirect_count, 0); + } +} diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs index c68b7dd5a..59604e296 100644 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@ -22,6 +22,7 @@ pub mod client_wrappers; pub mod clients; pub mod consumer_ext; pub mod http; +mod leader_aware; pub mod prelude; pub mod quic; pub mod stream_builder; diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index d89afe86a..8780dd947 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -30,6 +30,7 @@ pub use crate::client_provider; pub use crate::client_provider::ClientProviderConfig; pub use crate::client_wrappers::client_wrapper::ClientWrapper; +pub use crate::client_wrappers::connection_info::ConnectionInfo; pub use crate::clients::client::IggyClient; pub use crate::clients::client_builder::IggyClientBuilder; pub use crate::clients::consumer::{ @@ -53,19 +54,19 @@ pub use iggy_binary_protocol::{ }; pub use iggy_common::{ Aes256GcmEncryptor, Args, ArgsOptional, AutoLogin, BytesSerializable, CacheMetrics, - CacheMetricsKey, ClientError, ClientInfoDetails, CompressionAlgorithm, Consumer, - ConsumerGroupDetails, ConsumerKind, EncryptorKind, FlushUnsavedBuffer, GlobalPermissions, - HeaderKey, HeaderValue, HttpClientConfig, HttpClientConfigBuilder, IdKind, Identifier, - IdentityInfo, IggyByteSize, IggyDuration, IggyError, IggyExpiry, IggyIndexView, IggyMessage, - IggyMessageHeader, IggyMessageHeaderView, IggyMessageView, IggyMessageViewIterator, - IggyTimestamp, MaxTopicSize, Partition, Partitioner, Partitioning, Permissions, - PersonalAccessTokenExpiry, PollMessages, PolledMessages, PollingKind, PollingStrategy, - QuicClientConfig, QuicClientConfigBuilder, QuicClientReconnectionConfig, SendMessages, - Sizeable, SnapshotCompression, Stats, Stream, StreamDetails, StreamPermissions, - SystemSnapshotType, TcpClientConfig, TcpClientConfigBuilder, TcpClientReconnectionConfig, - Topic, TopicDetails, TopicPermissions, TransportProtocol, UserId, UserStatus, Validatable, - WebSocketClientConfig, WebSocketClientConfigBuilder, WebSocketClientReconnectionConfig, - defaults, locking, + CacheMetricsKey, ClientError, ClientInfoDetails, ClusterMetadata, ClusterNode, ClusterNodeRole, + ClusterNodeStatus, CompressionAlgorithm, Consumer, ConsumerGroupDetails, ConsumerKind, + EncryptorKind, FlushUnsavedBuffer, GlobalPermissions, HeaderKey, HeaderValue, HttpClientConfig, + HttpClientConfigBuilder, IdKind, Identifier, IdentityInfo, IggyByteSize, IggyDuration, + IggyError, IggyExpiry, IggyIndexView, IggyMessage, IggyMessageHeader, IggyMessageHeaderView, + IggyMessageView, IggyMessageViewIterator, IggyTimestamp, MaxTopicSize, Partition, Partitioner, + Partitioning, Permissions, PersonalAccessTokenExpiry, PollMessages, PolledMessages, + PollingKind, PollingStrategy, QuicClientConfig, QuicClientConfigBuilder, + QuicClientReconnectionConfig, SendMessages, Sizeable, SnapshotCompression, Stats, Stream, + StreamDetails, StreamPermissions, SystemSnapshotType, TcpClientConfig, TcpClientConfigBuilder, + TcpClientReconnectionConfig, Topic, TopicDetails, TopicPermissions, TransportProtocol, UserId, + UserStatus, Validatable, WebSocketClientConfig, WebSocketClientConfigBuilder, + WebSocketClientReconnectionConfig, defaults, locking, }; pub use iggy_common::{ IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, IGGY_MESSAGE_HEADER_SIZE, diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index 831890cb4..2f5e3c6d1 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -16,6 +16,7 @@ * under the License. */ +use crate::leader_aware::{LeaderRedirectionState, check_and_redirect_to_leader}; use crate::prelude::AutoLogin; use iggy_binary_protocol::{ BinaryClient, BinaryTransport, Client, PersonalAccessTokenClient, UserClient, @@ -28,7 +29,7 @@ use async_trait::async_trait; use bytes::Bytes; use iggy_common::{ ClientState, Command, ConnectionString, ConnectionStringUtils, Credentials, DiagnosticEvent, - QuicConnectionStringOptions, TransportProtocol, + IggyErrorDiscriminants, QuicConnectionStringOptions, TransportProtocol, }; use quinn::crypto::rustls::QuicClientConfig as QuinnQuicClientConfig; use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, RecvStream, VarInt}; @@ -51,10 +52,11 @@ pub struct QuicClient { pub(crate) endpoint: Endpoint, pub(crate) connection: Arc<Mutex<Option<Connection>>>, pub(crate) config: Arc<QuicClientConfig>, - pub(crate) server_address: SocketAddr, pub(crate) state: Mutex<ClientState>, events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>), connected_at: Mutex<Option<IggyTimestamp>>, + leader_redirection_state: Mutex<LeaderRedirectionState>, + pub(crate) current_server_address: Mutex<SocketAddr>, } unsafe impl Send for QuicClient {} @@ -126,9 +128,10 @@ impl BinaryTransport for QuicClient { } self.disconnect().await?; + let server_address = self.current_server_address.lock().await.to_string(); info!( "Reconnecting to the server: {}, by client: {}", - self.config.server_address, self.config.client_address + server_address, self.config.client_address ); self.connect().await?; self.send_raw(code, payload).await @@ -195,11 +198,12 @@ impl QuicClient { Ok(Self { config, endpoint, - server_address, connection: Arc::new(Mutex::new(None)), state: Mutex::new(ClientState::Disconnected), events: broadcast(1000), connected_at: Mutex::new(None), + leader_redirection_state: Mutex::new(LeaderRedirectionState::new()), + current_server_address: Mutex::new(server_address), }) } @@ -235,11 +239,20 @@ impl QuicClient { .map_err(|_| IggyError::InvalidNumberEncoding)?, ); if status != 0 { - error!( - "Received an invalid response with status: {} ({}).", - status, - IggyError::from_code_as_string(status) - ); + // Log FeatureUnavailable as debug instead of error (e.g., when clustering is disabled) + if status == IggyErrorDiscriminants::FeatureUnavailable as u32 { + tracing::debug!( + "Feature unavailable on server: {} ({})", + status, + IggyError::from_code_as_string(status) + ); + } else { + error!( + "Received an invalid response with status: {} ({}).", + status, + IggyError::from_code_as_string(status) + ); + } return Err(IggyError::from_code(status)); } @@ -260,135 +273,189 @@ impl QuicClient { } async fn connect(&self) -> Result<(), IggyError> { - match self.get_state().await { - ClientState::Shutdown => { - trace!("Cannot connect. Client is shutdown."); - return Err(IggyError::ClientShutdown); - } - ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => { - trace!("Client is already connected."); - return Ok(()); - } - ClientState::Connecting => { - trace!("Client is already connecting."); - return Ok(()); - } - _ => {} - } - - self.set_state(ClientState::Connecting).await; - if let Some(connected_at) = self.connected_at.lock().await.as_ref() { - let now = IggyTimestamp::now(); - let elapsed = now.as_micros() - connected_at.as_micros(); - let interval = self.config.reconnection.reestablish_after.as_micros(); - trace!( - "Elapsed time since last connection: {}", - IggyDuration::from(elapsed) - ); - if elapsed < interval { - let remaining = IggyDuration::from(interval - elapsed); - info!("Trying to connect to the server in: {remaining}",); - sleep(remaining.get_duration()).await; - } - } + self.connect_with_leader_detection().await + } - let mut retry_count = 0; - let connection; - let remote_address; + async fn connect_with_leader_detection(&self) -> Result<(), IggyError> { + // Leader detection loop loop { - info!( - "{NAME} client is connecting to server: {}...", - self.config.server_address - ); - let connection_result = self - .endpoint - .connect(self.server_address, &self.config.server_name) - .unwrap() - .await; - - if connection_result.is_err() { - error!( - "Failed to connect to server: {}", - self.config.server_address - ); - if !self.config.reconnection.enabled { - warn!("Automatic reconnection is disabled."); - return Err(IggyError::CannotEstablishConnection); + match self.get_state().await { + ClientState::Shutdown => { + trace!("Cannot connect. Client is shutdown."); + return Err(IggyError::ClientShutdown); } - - let unlimited_retries = self.config.reconnection.max_retries.is_none(); - let max_retries = self.config.reconnection.max_retries.unwrap_or_default(); - let max_retries_str = - if let Some(max_retries) = self.config.reconnection.max_retries { - max_retries.to_string() - } else { - "unlimited".to_string() - }; - - let interval_str = self.config.reconnection.interval.as_human_time_string(); - if unlimited_retries || retry_count < max_retries { - retry_count += 1; - info!( - "Retrying to connect to server ({retry_count}/{max_retries_str}): {} in: {interval_str}", - self.config.server_address, - ); - sleep(self.config.reconnection.interval.get_duration()).await; - continue; + ClientState::Connected + | ClientState::Authenticating + | ClientState::Authenticated => { + trace!("Client is already connected."); + return Ok(()); } - - self.set_state(ClientState::Disconnected).await; - self.publish_event(DiagnosticEvent::Disconnected).await; - return Err(IggyError::CannotEstablishConnection); + ClientState::Connecting => { + trace!("Client is already connecting."); + return Ok(()); + } + _ => {} } - connection = connection_result.map_err(|error| { - error!("Failed to establish QUIC connection: {error}"); - IggyError::CannotEstablishConnection - })?; - remote_address = connection.remote_address(); - break; - } - - let now = IggyTimestamp::now(); - info!("{NAME} client has connected to server: {remote_address} at {now}",); - self.set_state(ClientState::Connected).await; - self.connection.lock().await.replace(connection); - self.connected_at.lock().await.replace(now); - self.publish_event(DiagnosticEvent::Connected).await; - - match &self.config.auto_login { - AutoLogin::Disabled => { - info!("Automatic sign-in is disabled."); - Ok(()) + self.set_state(ClientState::Connecting).await; + if let Some(connected_at) = self.connected_at.lock().await.as_ref() { + let now = IggyTimestamp::now(); + let elapsed = now.as_micros() - connected_at.as_micros(); + let interval = self.config.reconnection.reestablish_after.as_micros(); + trace!( + "Elapsed time since last connection: {}", + IggyDuration::from(elapsed) + ); + if elapsed < interval { + let remaining = IggyDuration::from(interval - elapsed); + info!("Trying to connect to the server in: {remaining}",); + sleep(remaining.get_duration()).await; + } } - AutoLogin::Enabled(credentials) => { + + let mut retry_count = 0; + let connection; + let remote_address; + loop { + let server_address = *self.current_server_address.lock().await; info!( - "{NAME} client: {} is signing in...", - self.config.client_address + "{NAME} client is connecting to server: {}...", + server_address ); - self.set_state(ClientState::Authenticating).await; - match credentials { - Credentials::UsernamePassword(username, password) => { - self.login_user(username, password).await?; - self.publish_event(DiagnosticEvent::SignedIn).await; - info!( - "{NAME} client: {} has signed in with the user credentials, username: {username}", - self.config.client_address - ); - Ok(()) + let connection_result = self + .endpoint + .connect(server_address, &self.config.server_name) + .unwrap() + .await; + + if connection_result.is_err() { + error!("Failed to connect to server: {}", server_address); + if !self.config.reconnection.enabled { + warn!("Automatic reconnection is disabled."); + return Err(IggyError::CannotEstablishConnection); } - Credentials::PersonalAccessToken(token) => { - self.login_with_personal_access_token(token).await?; - self.publish_event(DiagnosticEvent::SignedIn).await; + + let unlimited_retries = self.config.reconnection.max_retries.is_none(); + let max_retries = self.config.reconnection.max_retries.unwrap_or_default(); + let max_retries_str = + if let Some(max_retries) = self.config.reconnection.max_retries { + max_retries.to_string() + } else { + "unlimited".to_string() + }; + + let interval_str = self.config.reconnection.interval.as_human_time_string(); + if unlimited_retries || retry_count < max_retries { + retry_count += 1; info!( - "{NAME} client: {} has signed in with a personal access token.", - self.config.client_address + "Retrying to connect to server ({retry_count}/{max_retries_str}): {} in: {interval_str}", + server_address, ); - Ok(()) + sleep(self.config.reconnection.interval.get_duration()).await; + continue; } + + self.set_state(ClientState::Disconnected).await; + self.publish_event(DiagnosticEvent::Disconnected).await; + return Err(IggyError::CannotEstablishConnection); } + + connection = connection_result.map_err(|error| { + error!("Failed to establish QUIC connection: {error}"); + IggyError::CannotEstablishConnection + })?; + remote_address = connection.remote_address(); + break; } - } + + let now = IggyTimestamp::now(); + info!("{NAME} client has connected to server: {remote_address} at {now}",); + self.set_state(ClientState::Connected).await; + self.connection.lock().await.replace(connection); + self.connected_at.lock().await.replace(now); + self.publish_event(DiagnosticEvent::Connected).await; + + // Handle auto-login + let should_redirect = match &self.config.auto_login { + AutoLogin::Disabled => { + info!("Automatic sign-in is disabled."); + false + } + AutoLogin::Enabled(credentials) => { + info!( + "{NAME} client: {} is signing in...", + self.config.client_address + ); + self.set_state(ClientState::Authenticating).await; + match credentials { + Credentials::UsernamePassword(username, password) => { + self.login_user(username, password).await?; + self.publish_event(DiagnosticEvent::SignedIn).await; + info!( + "{NAME} client: {} has signed in with the user credentials, username: {username}", + self.config.client_address + ); + } + Credentials::PersonalAccessToken(token) => { + self.login_with_personal_access_token(token).await?; + self.publish_event(DiagnosticEvent::SignedIn).await; + info!( + "{NAME} client: {} has signed in with a personal access token.", + self.config.client_address + ); + } + } + + // Check if we need to redirect to leader + let current_address = self.current_server_address.lock().await.to_string(); + let leader_address = + check_and_redirect_to_leader(self, ¤t_address).await?; + if let Some(new_leader_address) = leader_address { + let mut redirection_state = self.leader_redirection_state.lock().await; + if !redirection_state.can_redirect() { + warn!( + "Maximum leader redirections reached, continuing with current connection" + ); + false + } else { + info!( + "Current node is not leader, redirecting to leader at: {}", + new_leader_address + ); + redirection_state.increment_redirect(new_leader_address.clone()); + drop(redirection_state); + + // Disconnect from current node + self.disconnect().await?; + + // Update current server address - parse the new address + let new_socket_addr = + new_leader_address.parse::<SocketAddr>().map_err(|e| { + error!( + "Failed to parse leader address {}: {}", + new_leader_address, e + ); + IggyError::InvalidServerAddress + })?; + *self.current_server_address.lock().await = new_socket_addr; + + true // Need to redirect + } + } else { + // Reset redirection state on successful connection to leader + self.leader_redirection_state.lock().await.reset(); + false + } + } + }; + + if should_redirect { + // Continue to next iteration of the loop to reconnect to leader + continue; + } + + return Ok(()); + } // End of leader detection loop } async fn shutdown(&self) -> Result<(), IggyError> { diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index c4adc3f5f..e57eb2ccc 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -16,6 +16,7 @@ * under the License. */ +use crate::leader_aware::{LeaderRedirectionState, check_and_redirect_to_leader}; use crate::prelude::Client; use crate::prelude::TcpClientConfig; use crate::tcp::tcp_connection_stream::TcpConnectionStream; @@ -54,6 +55,8 @@ pub struct TcpClient { client_address: Mutex<Option<SocketAddr>>, events: (Sender<DiagnosticEvent>, Receiver<DiagnosticEvent>), connected_at: Mutex<Option<IggyTimestamp>>, + leader_redirection_state: Mutex<LeaderRedirectionState>, + pub(crate) current_server_address: Mutex<String>, } impl Default for TcpClient { @@ -129,9 +132,10 @@ impl BinaryTransport for TcpClient { { let client_address = self.get_client_address_value().await; + let server_address = self.current_server_address.lock().await.clone(); info!( "Reconnecting to the server: {} by client: {client_address}...", - self.config.server_address + server_address ); } @@ -191,6 +195,7 @@ impl TcpClient { /// Create a new TCP client based on the provided configuration. pub fn create(config: Arc<TcpClientConfig>) -> Result<Self, IggyError> { + let server_address = config.server_address.clone(); Ok(Self { config, client_address: Mutex::new(None), @@ -198,6 +203,8 @@ impl TcpClient { state: Mutex::new(ClientState::Disconnected), events: broadcast(1000), connected_at: Mutex::new(None), + leader_redirection_state: Mutex::new(LeaderRedirectionState::new()), + current_server_address: Mutex::new(server_address), }) } @@ -219,6 +226,13 @@ impl TcpClient { status, IggyError::from_code_as_string(status) ) + } else if status == IggyErrorDiscriminants::FeatureUnavailable as u32 { + // Feature unavailable - likely clustering is disabled on server + tracing::debug!( + "Feature unavailable on server: {} ({})", + status, + IggyError::from_code_as_string(status) + ) } else { error!( "Received an invalid response with status: {} ({}).", @@ -242,209 +256,257 @@ impl TcpClient { } async fn connect(&self) -> Result<(), IggyError> { - match self.get_state().await { - ClientState::Shutdown => { - trace!("Cannot connect. Client is shutdown."); - return Err(IggyError::ClientShutdown); - } - ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => { - let client_address = self.get_client_address_value().await; - trace!("Client: {client_address} is already connected."); - return Ok(()); - } - ClientState::Connecting => { - trace!("Client is already connecting."); - return Ok(()); - } - _ => {} - } - - self.set_state(ClientState::Connecting).await; - if let Some(connected_at) = self.connected_at.lock().await.as_ref() { - let now = IggyTimestamp::now(); - let elapsed = now.as_micros() - connected_at.as_micros(); - let interval = self.config.reconnection.reestablish_after.as_micros(); - trace!( - "Elapsed time since last connection: {}", - IggyDuration::from(elapsed) - ); - if elapsed < interval { - let remaining = IggyDuration::from(interval - elapsed); - info!("Trying to connect to the server in: {remaining}",); - sleep(remaining.get_duration()).await; - } - } + self.connect_with_leader_detection().await + } - let tls_enabled = self.config.tls_enabled; - let mut retry_count = 0; - let connection_stream: ConnectionStreamKind; - let remote_address; - let client_address; + async fn connect_with_leader_detection(&self) -> Result<(), IggyError> { + // Leader detection loop loop { - info!( - "{NAME} client is connecting to server: {}...", - self.config.server_address - ); + match self.get_state().await { + ClientState::Shutdown => { + trace!("Cannot connect. Client is shutdown."); + return Err(IggyError::ClientShutdown); + } + ClientState::Connected + | ClientState::Authenticating + | ClientState::Authenticated => { + let client_address = self.get_client_address_value().await; + trace!("Client: {client_address} is already connected."); + return Ok(()); + } + ClientState::Connecting => { + trace!("Client is already connecting."); + return Ok(()); + } + _ => {} + } - let connection = TcpStream::connect(&self.config.server_address).await; - if let Err(err) = &connection { - error!( - "Failed to connect to server: {}. Error: {}", - self.config.server_address, err + self.set_state(ClientState::Connecting).await; + if let Some(connected_at) = self.connected_at.lock().await.as_ref() { + let now = IggyTimestamp::now(); + let elapsed = now.as_micros() - connected_at.as_micros(); + let interval = self.config.reconnection.reestablish_after.as_micros(); + trace!( + "Elapsed time since last connection: {}", + IggyDuration::from(elapsed) ); - if !self.config.reconnection.enabled { - warn!("Automatic reconnection is disabled."); - return Err(IggyError::CannotEstablishConnection); + if elapsed < interval { + let remaining = IggyDuration::from(interval - elapsed); + info!("Trying to connect to the server in: {remaining}",); + sleep(remaining.get_duration()).await; } + } - let unlimited_retries = self.config.reconnection.max_retries.is_none(); - let max_retries = self.config.reconnection.max_retries.unwrap_or_default(); - let max_retries_str = - if let Some(max_retries) = self.config.reconnection.max_retries { - max_retries.to_string() - } else { - "unlimited".to_string() - }; - - let interval_str = self.config.reconnection.interval.as_human_time_string(); - if unlimited_retries || retry_count < max_retries { - retry_count += 1; - info!( - "Retrying to connect to server ({retry_count}/{max_retries_str}): {} in: {interval_str}", - self.config.server_address, + let tls_enabled = self.config.tls_enabled; + let mut retry_count = 0; + let connection_stream: ConnectionStreamKind; + let remote_address; + let client_address; + loop { + let server_address = self.current_server_address.lock().await.clone(); + info!( + "{NAME} client is connecting to server: {}...", + server_address + ); + + let connection = TcpStream::connect(&server_address).await; + if let Err(err) = &connection { + error!( + "Failed to connect to server: {}. Error: {}", + server_address, err ); - sleep(self.config.reconnection.interval.get_duration()).await; - continue; + if !self.config.reconnection.enabled { + warn!("Automatic reconnection is disabled."); + return Err(IggyError::CannotEstablishConnection); + } + + let unlimited_retries = self.config.reconnection.max_retries.is_none(); + let max_retries = self.config.reconnection.max_retries.unwrap_or_default(); + let max_retries_str = + if let Some(max_retries) = self.config.reconnection.max_retries { + max_retries.to_string() + } else { + "unlimited".to_string() + }; + + let interval_str = self.config.reconnection.interval.as_human_time_string(); + if unlimited_retries || retry_count < max_retries { + retry_count += 1; + info!( + "Retrying to connect to server ({retry_count}/{max_retries_str}): {} in: {interval_str}", + server_address, + ); + sleep(self.config.reconnection.interval.get_duration()).await; + continue; + } + + self.set_state(ClientState::Disconnected).await; + self.publish_event(DiagnosticEvent::Disconnected).await; + return Err(IggyError::CannotEstablishConnection); } - self.set_state(ClientState::Disconnected).await; - self.publish_event(DiagnosticEvent::Disconnected).await; - return Err(IggyError::CannotEstablishConnection); - } + let stream = connection.map_err(|error| { + error!("Failed to establish TCP connection to the server: {error}",); + IggyError::CannotEstablishConnection + })?; + client_address = stream.local_addr().map_err(|error| { + error!("Failed to get the local address of the client: {error}",); + IggyError::CannotEstablishConnection + })?; + remote_address = stream.peer_addr().map_err(|error| { + error!("Failed to get the remote address of the server: {error}",); + IggyError::CannotEstablishConnection + })?; + self.client_address.lock().await.replace(client_address); - let stream = connection.map_err(|error| { - error!("Failed to establish TCP connection to the server: {error}",); - IggyError::CannotEstablishConnection - })?; - client_address = stream.local_addr().map_err(|error| { - error!("Failed to get the local address of the client: {error}",); - IggyError::CannotEstablishConnection - })?; - remote_address = stream.peer_addr().map_err(|error| { - error!("Failed to get the remote address of the server: {error}",); - IggyError::CannotEstablishConnection - })?; - self.client_address.lock().await.replace(client_address); - - if let Err(e) = stream.set_nodelay(self.config.nodelay) { - error!("Failed to set the nodelay option on the client: {e}, continuing...",); - } + if let Err(e) = stream.set_nodelay(self.config.nodelay) { + error!("Failed to set the nodelay option on the client: {e}, continuing...",); + } - if !tls_enabled { - connection_stream = - ConnectionStreamKind::Tcp(TcpConnectionStream::new(client_address, stream)); - break; - } + if !tls_enabled { + connection_stream = + ConnectionStreamKind::Tcp(TcpConnectionStream::new(client_address, stream)); + break; + } - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - - let config = if self.config.tls_validate_certificate { - let mut root_cert_store = rustls::RootCertStore::empty(); - if let Some(certificate_path) = &self.config.tls_ca_file { - for cert in - CertificateDer::pem_file_iter(certificate_path).map_err(|error| { - error!("Failed to read the CA file: {certificate_path}. {error}",); - IggyError::InvalidTlsCertificatePath - })? - { - let certificate = cert.map_err(|error| { + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + + let config = if self.config.tls_validate_certificate { + let mut root_cert_store = rustls::RootCertStore::empty(); + if let Some(certificate_path) = &self.config.tls_ca_file { + for cert in + CertificateDer::pem_file_iter(certificate_path).map_err(|error| { + error!("Failed to read the CA file: {certificate_path}. {error}",); + IggyError::InvalidTlsCertificatePath + })? + { + let certificate = cert.map_err(|error| { error!( "Failed to read a certificate from the CA file: {certificate_path}. {error}", ); IggyError::InvalidTlsCertificate })?; - root_cert_store.add(certificate).map_err(|error| { + root_cert_store.add(certificate).map_err(|error| { error!( "Failed to add a certificate to the root certificate store. {error}", ); IggyError::InvalidTlsCertificate })?; + } + } else { + root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); } + + rustls::ClientConfig::builder() + .with_root_certificates(root_cert_store) + .with_no_client_auth() + } else { + use crate::tcp::tcp_tls_verifier::NoServerVerification; + rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoServerVerification)) + .with_no_client_auth() + }; + let connector = TlsConnector::from(Arc::new(config)); + let tls_domain = if self.config.tls_domain.is_empty() { + // Extract hostname/IP from server_address when tls_domain is not specified + server_address + .split(':') + .next() + .unwrap_or(&server_address) + .to_string() } else { - root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + self.config.tls_domain.to_owned() + }; + let domain = ServerName::try_from(tls_domain).map_err(|error| { + error!("Failed to create a server name from the domain. {error}",); + IggyError::InvalidTlsDomain + })?; + let stream = connector.connect(domain, stream).await.map_err(|error| { + error!("Failed to establish a TLS connection to the server: {error}",); + IggyError::CannotEstablishConnection + })?; + connection_stream = ConnectionStreamKind::TcpTls(TcpTlsConnectionStream::new( + client_address, + TlsStream::Client(stream), + )); + break; + } + + let now = IggyTimestamp::now(); + info!( + "{NAME} client: {client_address} has connected to server: {remote_address} at: {now}", + ); + self.stream.lock().await.replace(connection_stream); + self.set_state(ClientState::Connected).await; + self.connected_at.lock().await.replace(now); + self.publish_event(DiagnosticEvent::Connected).await; + // Handle auto-login + let should_redirect = match &self.config.auto_login { + AutoLogin::Disabled => { + info!("Automatic sign-in is disabled."); + false } + AutoLogin::Enabled(credentials) => { + info!("{NAME} client: {client_address} is signing in..."); + self.set_state(ClientState::Authenticating).await; + match credentials { + Credentials::UsernamePassword(username, password) => { + self.login_user(username, password).await?; + info!( + "{NAME} client: {client_address} has signed in with the user credentials, username: {username}", + ); + } + Credentials::PersonalAccessToken(token) => { + self.login_with_personal_access_token(token).await?; + info!( + "{NAME} client: {client_address} has signed in with a personal access token.", + ); + } + } - rustls::ClientConfig::builder() - .with_root_certificates(root_cert_store) - .with_no_client_auth() - } else { - use crate::tcp::tcp_tls_verifier::NoServerVerification; - rustls::ClientConfig::builder() - .dangerous() - .with_custom_certificate_verifier(Arc::new(NoServerVerification)) - .with_no_client_auth() - }; - let connector = TlsConnector::from(Arc::new(config)); - let tls_domain = if self.config.tls_domain.is_empty() { - // Extract hostname/IP from server_address when tls_domain is not specified - self.config - .server_address - .split(':') - .next() - .unwrap_or(&self.config.server_address) - .to_string() - } else { - self.config.tls_domain.to_owned() - }; - let domain = ServerName::try_from(tls_domain).map_err(|error| { - error!("Failed to create a server name from the domain. {error}",); - IggyError::InvalidTlsDomain - })?; - let stream = connector.connect(domain, stream).await.map_err(|error| { - error!("Failed to establish a TLS connection to the server: {error}",); - IggyError::CannotEstablishConnection - })?; - connection_stream = ConnectionStreamKind::TcpTls(TcpTlsConnectionStream::new( - client_address, - TlsStream::Client(stream), - )); - break; - } + // Check if we need to redirect to leader + let current_address = self.current_server_address.lock().await.clone(); + let leader_address = + check_and_redirect_to_leader(self, ¤t_address).await?; + if let Some(new_leader_address) = leader_address { + let mut redirection_state = self.leader_redirection_state.lock().await; + if !redirection_state.can_redirect() { + warn!( + "Maximum leader redirections reached, continuing with current connection" + ); + false + } else { + info!( + "Current node is not leader, redirecting to leader at: {}", + new_leader_address + ); + redirection_state.increment_redirect(new_leader_address.clone()); + drop(redirection_state); - let now = IggyTimestamp::now(); - info!( - "{NAME} client: {client_address} has connected to server: {remote_address} at: {now}", - ); - self.stream.lock().await.replace(connection_stream); - self.set_state(ClientState::Connected).await; - self.connected_at.lock().await.replace(now); - self.publish_event(DiagnosticEvent::Connected).await; - match &self.config.auto_login { - AutoLogin::Disabled => { - info!("Automatic sign-in is disabled."); - Ok(()) - } - AutoLogin::Enabled(credentials) => { - info!("{NAME} client: {client_address} is signing in..."); - self.set_state(ClientState::Authenticating).await; - match credentials { - Credentials::UsernamePassword(username, password) => { - self.login_user(username, password).await?; - info!( - "{NAME} client: {client_address} has signed in with the user credentials, username: {username}", - ); - Ok(()) - } - Credentials::PersonalAccessToken(token) => { - self.login_with_personal_access_token(token).await?; - info!( - "{NAME} client: {client_address} has signed in with a personal access token.", - ); - Ok(()) + // Disconnect from current node + self.disconnect().await?; + + // Update current server address + *self.current_server_address.lock().await = new_leader_address; + + true // Need to redirect + } + } else { + // Reset redirection state on successful connection to leader + self.leader_redirection_state.lock().await.reset(); + false } } + }; + + if should_redirect { + // Continue to next iteration of the loop to reconnect to leader + continue; } - } + + return Ok(()); + } // End of leader detection loop } async fn disconnect(&self) -> Result<(), IggyError> { diff --git a/core/server/src/args.rs b/core/server/src/args.rs index 3a6cfdb96..64cddb1e7 100644 --- a/core/server/src/args.rs +++ b/core/server/src/args.rs @@ -105,4 +105,21 @@ pub struct Args { /// iggy-server --with-default-root-credentials # Use 'iggy/iggy' as root credentials #[arg(long, default_value_t = false, verbatim_doc_comment)] pub with_default_root_credentials: bool, + + /// Run server as a follower node (FOR TESTING LEADER REDIRECTION) + /// + /// When this flag is set, the server will report itself as a follower node + /// in cluster metadata responses. This is useful for testing leader-aware + /// client connections and redirection logic. + /// + /// The server will return mock cluster metadata showing: + /// - This server as a follower node + /// - A hypothetical leader node on port 8090 (or configured TCP port) + /// + /// Examples: + /// iggy-server # Run as leader (default) + /// iggy-server --follower # Run as follower + /// IGGY_TCP_ADDRESS=127.0.0.1:8091 iggy-server --follower # Follower on port 8091 + #[arg(long, default_value_t = false, verbatim_doc_comment)] + pub follower: bool, } diff --git a/core/server/src/main.rs b/core/server/src/main.rs index becbfaf50..896eace55 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -82,6 +82,7 @@ async fn main() -> Result<(), ServerError> { ); } let args = Args::parse(); + let is_follower = args.follower; // FIRST DISCRETE LOADING STEP. // Load config and create directories. @@ -115,6 +116,10 @@ async fn main() -> Result<(), ServerError> { // From this point on, we can use tracing macros to log messages. logging.late_init(config.system.get_system_path(), &config.system.logging)?; + if is_follower { + info!("Server is running in FOLLOWER mode for testing leader redirection"); + } + if args.with_default_root_credentials { let username_set = std::env::var("IGGY_ROOT_USERNAME").is_ok(); let password_set = std::env::var("IGGY_ROOT_PASSWORD").is_ok(); @@ -357,6 +362,7 @@ async fn main() -> Result<(), ServerError> { .encryptor(encryptor) .version(current_version) .metrics(metrics) + .is_follower(is_follower) .build(); let shard = Rc::new(shard); diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index 57140e68a..8f842fecf 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -25,8 +25,7 @@ use crate::streaming::session::Session; use anyhow::anyhow; use compio_quic::{Connection, Endpoint, RecvStream, SendStream}; use futures::FutureExt; -use iggy_common::IggyError; -use iggy_common::TransportProtocol; +use iggy_common::{GET_CLUSTER_METADATA_CODE, IggyError, TransportProtocol}; use std::rc::Rc; use tracing::{debug, error, info, trace}; @@ -198,21 +197,32 @@ async fn handle_stream( Ok(()) } Err(e) => { - error!( - "Command was not handled successfully, session: {:?}, error: {e}.", - session - ); - // Only return a connection-terminating error for client not found - if let IggyError::ClientNotFound(_) = e { - sender.send_error_response(e.clone()).await?; - trace!("QUIC error response was sent."); - error!("Session will be deleted."); - Err(anyhow!("Client not found: {e}")) - } else { - // For all other errors, send response and continue the connection + // Special handling for GetClusterMetadata when clustering is disabled + if code == GET_CLUSTER_METADATA_CODE && matches!(e, IggyError::FeatureUnavailable) { + debug!( + "GetClusterMetadata command not available (clustering disabled), session: {:?}.", + session + ); sender.send_error_response(e).await?; trace!("QUIC error response was sent."); Ok(()) + } else { + error!( + "Command was not handled successfully, session: {:?}, error: {e}.", + session + ); + // Only return a connection-terminating error for client not found + if let IggyError::ClientNotFound(_) = e { + sender.send_error_response(e.clone()).await?; + trace!("QUIC error response was sent."); + error!("Session will be deleted."); + Err(anyhow!("Client not found: {e}")) + } else { + // For all other errors, send response and continue the connection + sender.send_error_response(e).await?; + trace!("QUIC error response was sent."); + Ok(()) + } } } } diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 37e3099ec..98914543e 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -48,6 +48,7 @@ pub struct IggyShardBuilder { encryptor: Option<EncryptorKind>, version: Option<SemanticVersion>, metrics: Option<Metrics>, + is_follower: bool, } impl IggyShardBuilder { @@ -109,6 +110,11 @@ impl IggyShardBuilder { self } + pub fn is_follower(mut self, is_follower: bool) -> Self { + self.is_follower = is_follower; + self + } + // TODO: Too much happens in there, some of those bootstrapping logic should be moved outside. pub fn build(self) -> IggyShard { let id = self.id.unwrap(); @@ -161,6 +167,7 @@ impl IggyShardBuilder { stop_receiver, messages_receiver: Cell::new(Some(frame_receiver)), metrics, + is_follower: self.is_follower, is_shutting_down: AtomicBool::new(false), tcp_bound_address: Cell::new(None), quic_bound_address: Cell::new(None), diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 9d272c417..64243f51e 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -86,6 +86,7 @@ pub struct IggyShard { pub(crate) permissioner: RefCell<Permissioner>, pub(crate) users: Users, pub(crate) metrics: Metrics, + pub(crate) is_follower: bool, pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>, pub(crate) stop_receiver: StopReceiver, pub(crate) is_shutting_down: AtomicBool, diff --git a/core/server/src/shard/system/cluster.rs b/core/server/src/shard/system/cluster.rs index 1619bb8aa..0003a24b5 100644 --- a/core/server/src/shard/system/cluster.rs +++ b/core/server/src/shard/system/cluster.rs @@ -18,6 +18,7 @@ use crate::shard::IggyShard; use crate::streaming::session::Session; +use err_trail::ErrContext; use iggy_common::{ ClusterMetadata, ClusterNode, ClusterNodeRole, ClusterNodeStatus, IggyError, TransportProtocol, }; @@ -35,27 +36,46 @@ impl IggyShard { // TODO(hubcio): Clustering is not yet implemented // The leader/follower as well as node status are currently placeholder implementations. - let name = self.config.cluster.name.clone(); - let id = self.config.cluster.id; + let cluster_name = self.config.cluster.name.clone(); + let cluster_id = self.config.cluster.id; - // Parse transport string to TransportProtocol enum let transport = TransportProtocol::from_str(&self.config.cluster.transport) + .with_error(|e| { + format!( + "Invalid cluster transport protocol: {}, error: {}", + self.config.cluster.transport, e + ) + }) .map_err(|_| IggyError::InvalidConfiguration)?; + let own_node_id = self.config.cluster.node.id; + let nodes: Vec<ClusterNode> = self .config .cluster .nodes .iter() .map(|node_config| { - let role = if node_config.id == 1 { - ClusterNodeRole::Leader + let (role, status) = if node_config.id == own_node_id { + ( + if self.is_follower { + ClusterNodeRole::Follower + } else { + ClusterNodeRole::Leader + }, + ClusterNodeStatus::Healthy, + ) } else { - ClusterNodeRole::Follower + ( + if self.is_follower { + ClusterNodeRole::Leader + } else { + ClusterNodeRole::Follower + }, + ClusterNodeStatus::Healthy, + ) }; - let status = ClusterNodeStatus::Healthy; - ClusterNode { id: node_config.id, name: node_config.name.clone(), @@ -67,8 +87,8 @@ impl IggyShard { .collect(); Ok(ClusterMetadata { - name, - id, + name: cluster_name, + id: cluster_id, transport, nodes, }) diff --git a/core/server/src/tcp/connection_handler.rs b/core/server/src/tcp/connection_handler.rs index 4df84fa0d..30e142e44 100644 --- a/core/server/src/tcp/connection_handler.rs +++ b/core/server/src/tcp/connection_handler.rs @@ -25,7 +25,7 @@ use crate::tcp::connection_handler::command::ServerCommand; use async_channel::Receiver; use bytes::BytesMut; use futures::FutureExt; -use iggy_common::IggyError; +use iggy_common::{GET_CLUSTER_METADATA_CODE, IggyError}; use std::io::ErrorKind; use std::rc::Rc; use tracing::{debug, error, info}; @@ -89,19 +89,30 @@ pub(crate) async fn handle_connection( ); } Err(error) => { - error!( - "Command with code {cmd_code} was not handled successfully, session: {session}, error: {error}." - ); - if let IggyError::ClientNotFound(_) = error { + // Special handling for GetClusterMetadata when clustering is disabled + if cmd_code == GET_CLUSTER_METADATA_CODE + && matches!(error, IggyError::FeatureUnavailable) + { + debug!( + "GetClusterMetadata command not available (clustering disabled), session: {session}." + ); sender.send_error_response(error).await?; debug!("TCP error response was sent to: {session}."); - error!("Session: {session} will be deleted."); - return Err(ConnectionError::from(IggyError::ClientNotFound( - session.client_id, - ))); } else { - sender.send_error_response(error).await?; - debug!("TCP error response was sent to: {session}."); + error!( + "Command with code {cmd_code} was not handled successfully, session: {session}, error: {error}." + ); + if let IggyError::ClientNotFound(_) = error { + sender.send_error_response(error).await?; + debug!("TCP error response was sent to: {session}."); + error!("Session: {session} will be deleted."); + return Err(ConnectionError::from(IggyError::ClientNotFound( + session.client_id, + ))); + } else { + sender.send_error_response(error).await?; + debug!("TCP error response was sent to: {session}."); + } } } }
