This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_direct_io_socket_transfer in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 7d9567fb5850b24c0405cc14698f5591773658cc Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Jul 9 22:57:50 2025 +0200 socket transfer --- core/common/src/error/iggy_error.rs | 2 + .../handlers/messages/send_messages_handler.rs | 72 ++++++++++- core/server/src/shard/builder.rs | 2 +- core/server/src/shard/mod.rs | 132 ++++++++++++++++++++- core/server/src/shard/transmission/message.rs | 14 ++- core/server/src/tcp/connection_handler.rs | 40 +++++-- core/server/src/tcp/tcp_listener.rs | 27 +++-- core/server/src/tcp/tcp_sender.rs | 6 + 8 files changed, 262 insertions(+), 33 deletions(-) diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index 53b4eb5bd..a68e0967e 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -158,6 +158,8 @@ pub enum IggyError { InvalidClientId = 101, #[error("Connection closed")] ConnectionClosed = 206, + #[error("Socket transferred to another shard")] + SocketTransferred = 207, #[error("Cannot parse header kind from {0}")] CannotParseHeaderKind(String) = 209, #[error("HTTP response error, status: {0}, body: {1}")] diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 6d155bcac..5813c2588 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -31,9 +31,10 @@ use compio::buf::{IntoInner as _, IoBuf}; use iggy_common::Identifier; use iggy_common::Sizeable; use iggy_common::{INDEX_SIZE, IdKind}; -use iggy_common::{IggyError, Partitioning, SendMessages, Validatable}; +use iggy_common::{IggyError, Partitioning, PartitioningKind, SendMessages, Validatable}; +use nix::libc; use std::rc::Rc; -use tracing::instrument; +use tracing::{info, instrument}; impl ServerCommandHandler for SendMessages { fn code(&self) -> u32 { @@ -107,6 +108,72 @@ impl ServerCommandHandler for SendMessages { .await? .into_inner(); + let user_id = session.get_user_id(); + + let stream = shard.get_stream(&self.stream_id)?; + let topic = stream.get_topic(&self.topic_id)?; + let partition_id = match self.partitioning.kind { + PartitioningKind::PartitionId => { + u32::from_le_bytes(self.partitioning.value[..4].try_into().unwrap()) + } + _ => 0, + }; + + let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, partition_id); + if let Some(target_shard) = shard.find_shard(&namespace) { + if target_shard.id() != shard.id { + use crate::tcp::tcp_sender::TcpSender; + use std::os::fd::AsRawFd; + + if let SenderKind::Tcp(TcpSender { stream: tcp_stream }) = sender { + let raw_fd = tcp_stream.as_raw_fd(); + let new_fd = unsafe { libc::dup(raw_fd) }; + if new_fd == -1 { + return Err(IggyError::CannotReadMessagePayload); + } + + let mut initial_data = Vec::new(); + initial_data.extend_from_slice(&(length).to_le_bytes()); + initial_data.extend_from_slice(&(self.code()).to_le_bytes()); + initial_data.extend_from_slice(&(metadata_size).to_le_bytes()); + initial_data.extend_from_slice(&metadata_buf[..]); + initial_data.extend_from_slice(&indexes_buffer[..]); + initial_data.extend_from_slice(&messages_buffer[..]); + + let payload = ShardRequestPayload::SocketTransfer { + fd: new_fd, + from_shard: shard.id, + client_id: session.client_id, + user_id: session.get_user_id(), + ip_address: session.ip_address, + initial_data, + }; + let request = + ShardRequest::new(stream.stream_id, topic.topic_id, partition_id, payload); + let message = ShardMessage::Request(request); + + match target_shard.send_request(message).await? { + ShardResponse::SendMessages => { + info!( + "Socket transferred from shard {} to shard {}", + shard.id, + target_shard.id() + ); + // Return SocketTransferred to signal clean exit without sending response + // The socket has been transferred, so the original handler should stop + return Err(IggyError::SocketTransferred); + } + _ => { + unsafe { + libc::close(new_fd); + } + return Err(IggyError::ShardCommunicationError(target_shard.id())); + } + } + } + } + } + let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0); let batch = IggyMessagesBatchMut::from_indexes_and_messages( messages_count, @@ -115,7 +182,6 @@ impl ServerCommandHandler for SendMessages { ); batch.validate()?; - let user_id = session.get_user_id(); shard .append_messages( user_id, diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index c49cdeb66..1439f9055 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -17,7 +17,7 @@ */ use std::{ - cell::Cell, + cell::{Cell, RefCell}, rc::Rc, sync::{Arc, atomic::AtomicBool}, }; diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 019358680..46aa8004e 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -37,12 +37,16 @@ use namespace::IggyNamespace; use std::{ cell::{Cell, RefCell}, future::Future, + io::ErrorKind, + net::SocketAddr, + os::fd::{FromRawFd, RawFd}, pin::Pin, rc::Rc, str::FromStr, sync::{ Arc, atomic::{AtomicBool, AtomicU32, Ordering}, + mpsc, }, time::{Duration, Instant}, }; @@ -90,6 +94,15 @@ static USER_ID: AtomicU32 = AtomicU32::new(1); type Task = Pin<Box<dyn Future<Output = Result<(), IggyError>>>>; +#[derive(Debug)] +pub struct SocketTransfer { + pub fd: RawFd, + pub from_shard: u16, + pub to_shard: u16, + pub initial_data: Vec<u8>, + pub session: Rc<Session>, +} + pub(crate) struct Shard { id: u16, connection: ShardConnector<ShardFrame>, @@ -103,6 +116,10 @@ impl Shard { } } + pub fn id(&self) -> u16 { + self.id + } + pub async fn send_request(&self, message: ShardMessage) -> Result<ShardResponse, IggyError> { let (sender, receiver) = async_channel::bounded(1); self.connection @@ -540,7 +557,10 @@ impl IggyShard { self.shards.len() as u32 } - pub async fn handle_shard_message(&self, message: ShardMessage) -> Option<ShardResponse> { + pub async fn handle_shard_message( + self: &Rc<Self>, + message: ShardMessage, + ) -> Option<ShardResponse> { match message { ShardMessage::Request(request) => match self.handle_request(request).await { Ok(response) => Some(response), @@ -553,21 +573,48 @@ impl IggyShard { } } - async fn handle_request(&self, request: ShardRequest) -> Result<ShardResponse, IggyError> { - let stream = self.get_stream(&Identifier::numeric(request.stream_id)?)?; - let topic = stream.get_topic(&Identifier::numeric(request.topic_id)?)?; - let partition_id = request.partition_id; + async fn handle_request( + self: &Rc<Self>, + request: ShardRequest, + ) -> Result<ShardResponse, IggyError> { match request.payload { ShardRequestPayload::SendMessages { batch } => { + let stream = self.get_stream(&Identifier::numeric(request.stream_id)?)?; + let topic = stream.get_topic(&Identifier::numeric(request.topic_id)?)?; + let partition_id = request.partition_id; topic.append_messages(partition_id, batch).await?; Ok(ShardResponse::SendMessages) } ShardRequestPayload::PollMessages { args, consumer } => { + let stream = self.get_stream(&Identifier::numeric(request.stream_id)?)?; + let topic = stream.get_topic(&Identifier::numeric(request.topic_id)?)?; + let partition_id = request.partition_id; let (metadata, batch) = topic .get_messages(consumer, partition_id, args.strategy, args.count) .await?; Ok(ShardResponse::PollMessages((metadata, batch))) } + ShardRequestPayload::SocketTransfer { + fd, + from_shard, + client_id, + user_id, + ip_address, + initial_data, + } => { + // Process the transfer and return whether it succeeded + self.handle_socket_transfer( + fd, + from_shard, + client_id, + user_id, + ip_address, + initial_data, + ) + .await?; + // Return success so the original shard knows the transfer worked + Ok(ShardResponse::SendMessages) + } } } @@ -928,7 +975,7 @@ impl IggyShard { responses } - fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> { + pub fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> { let shards_table = self.shards_table.borrow(); shards_table.get(namespace).map(|shard_info| { self.shards @@ -1014,4 +1061,77 @@ impl IggyShard { })?; Ok(user_id) } + + async fn handle_socket_transfer( + self: &Rc<Self>, + fd: RawFd, + from_shard: u16, + client_id: u32, + user_id: u32, + ip_address: SocketAddr, + initial_data: Vec<u8>, + ) -> Result<(), IggyError> { + use crate::binary::sender::SenderKind; + use crate::streaming::clients::client_manager::Transport; + use crate::tcp::connection_handler::handle_connection; + use compio::net::TcpStream; + + info!( + "Shard {} received socket transfer from shard {} for client {} (user {})", + self.id, from_shard, client_id, user_id + ); + + let std_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) }; + let stream = TcpStream::from_std(std_stream) + .map_err(|e| IggyError::ShardCommunicationError(self.id))?; + + let mut sender = SenderKind::get_tcp_sender(stream); + + let transport = Transport::Tcp; + let session = self.add_client(&ip_address, transport); + + // Set the authenticated user ID on the session + if user_id > 0 { + session.set_user_id(user_id); + } + + self.add_active_session(session.clone()); + + // Process the SendMessages command from initial_data and send response + // The initial_data contains the full SendMessages request that needs to be processed + // For now, just send OK response to continue the connection + sender.send_empty_ok_response().await?; + + let conn_stop_receiver = self.task_registry.add_connection(session.client_id); + let shard_for_conn = self.clone(); + let address = ip_address; + + self.task_registry.spawn_tracked(async move { + if let Err(error) = handle_connection(&session, &mut sender, &shard_for_conn, conn_stop_receiver).await { + crate::tcp::connection_handler::handle_error(error); + } + shard_for_conn.task_registry.remove_connection(&session.client_id); + + if let Err(error) = sender.shutdown().await { + // Ignore "Transport endpoint is not connected" errors for transferred sockets + if let crate::server_error::ServerError::IoError(io_error) = &error { + if io_error.kind() == std::io::ErrorKind::NotConnected { + info!( + "Transferred TCP stream for client: {client_id}, address: {address} was already closed." + ); + return; + } + } + error!( + "Failed to shutdown TCP stream for client: {client_id}, address: {address}. {error}" + ); + } else { + info!( + "Successfully closed TCP stream for client: {client_id}, address: {address}." + ); + } + }); + + Ok(()) + } } diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs index fe369badd..0337f7ad7 100644 --- a/core/server/src/shard/transmission/message.rs +++ b/core/server/src/shard/transmission/message.rs @@ -1,4 +1,4 @@ -use std::{rc::Rc, sync::Arc}; +use std::{net::SocketAddr, os::fd::RawFd, rc::Rc, sync::Arc}; use iggy_common::PollingStrategy; @@ -24,7 +24,9 @@ use crate::{ system::messages::PollingArgs, transmission::{event::ShardEvent, frame::ShardResponse}, }, - streaming::{polling_consumer::PollingConsumer, segments::IggyMessagesBatchMut}, + streaming::{ + polling_consumer::PollingConsumer, segments::IggyMessagesBatchMut, session::Session, + }, }; pub enum ShardSendRequestResult { @@ -72,6 +74,14 @@ pub enum ShardRequestPayload { consumer: PollingConsumer, args: PollingArgs, }, + SocketTransfer { + fd: RawFd, + from_shard: u16, + client_id: u32, + user_id: u32, + ip_address: SocketAddr, + initial_data: Vec<u8>, + }, } impl From<ShardRequest> for ShardMessage { diff --git a/core/server/src/tcp/connection_handler.rs b/core/server/src/tcp/connection_handler.rs index 1f1bb4827..4aea2a622 100644 --- a/core/server/src/tcp/connection_handler.rs +++ b/core/server/src/tcp/connection_handler.rs @@ -95,17 +95,32 @@ pub(crate) async fn handle_connection( ); } Err(error) => { - error!("Command was not handled successfully, session: {session}, error: {error}."); - if let IggyError::ClientNotFound(_) = error { - sender.send_error_response(error).await?; - debug!("TCP error response was sent to: {session}."); - error!("Session: {session} will be deleted."); - return Err(ConnectionError::from(IggyError::ClientNotFound( - session.client_id, - ))); - } else { - sender.send_error_response(error).await?; - debug!("TCP error response was sent to: {session}."); + match error { + IggyError::SocketTransferred => { + // Socket was transferred to another shard, exit cleanly without sending response + info!( + "Socket transferred for session: {session}, closing original handler." + ); + return Err(ConnectionError::from(IggyError::SocketTransferred)); + } + IggyError::ClientNotFound(_) => { + error!( + "Command was not handled successfully, session: {session}, error: {error}." + ); + sender.send_error_response(error).await?; + debug!("TCP error response was sent to: {session}."); + error!("Session: {session} will be deleted."); + return Err(ConnectionError::from(IggyError::ClientNotFound( + session.client_id, + ))); + } + _ => { + error!( + "Command was not handled successfully, session: {session}, error: {error}." + ); + sender.send_error_response(error).await?; + debug!("TCP error response was sent to: {session}."); + } } } } @@ -135,6 +150,9 @@ pub(crate) fn handle_error(error: ConnectionError) { IggyError::ConnectionClosed => { debug!("Client closed connection."); } + IggyError::SocketTransferred => { + info!("Socket successfully transferred to another shard."); + } _ => { error!("Failure in internal SDK call: {sdk_error}"); } diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index 6ad2cf08c..cb7619392 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -117,19 +117,26 @@ pub async fn start( let shard_for_conn = shard_clone.clone(); shard_clone.task_registry.spawn_tracked(async move { - if let Err(error) = handle_connection(&session, &mut sender, &shard_for_conn, conn_stop_receiver).await { + let should_shutdown = if let Err(error) = handle_connection(&session, &mut sender, &shard_for_conn, conn_stop_receiver).await { + // Check if socket was transferred + let is_transferred = matches!(&error, crate::server_error::ConnectionError::SdkError(e) if e.as_code() == iggy_common::IggyError::SocketTransferred.as_code()); handle_error(error); - } + !is_transferred // Don't shutdown if socket was transferred + } else { + true // Normal completion, shutdown the socket + }; shard_for_conn.task_registry.remove_connection(&client_id); - if let Err(error) = sender.shutdown().await { - error!( - "Failed to shutdown TCP stream for client: {client_id}, address: {address}. {error}" - ); - } else { - info!( - "Successfully closed TCP stream for client: {client_id}, address: {address}." - ); + if should_shutdown { + if let Err(error) = sender.shutdown().await { + error!( + "Failed to shutdown TCP stream for client: {client_id}, address: {address}. {error}" + ); + } else { + info!( + "Successfully closed TCP stream for client: {client_id}, address: {address}." + ); + } } }); } diff --git a/core/server/src/tcp/tcp_sender.rs b/core/server/src/tcp/tcp_sender.rs index ccdc95d24..06965ee09 100644 --- a/core/server/src/tcp/tcp_sender.rs +++ b/core/server/src/tcp/tcp_sender.rs @@ -31,6 +31,12 @@ pub struct TcpSender { pub(crate) stream: TcpStream, } +impl TcpSender { + pub fn into_stream(self) -> TcpStream { + self.stream + } +} + impl Sender for TcpSender { async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError> { sender::read(&mut self.stream, buffer).await
