This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_direct_io_socket_transfer
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 7d9567fb5850b24c0405cc14698f5591773658cc
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jul 9 22:57:50 2025 +0200

    socket transfer
---
 core/common/src/error/iggy_error.rs                |   2 +
 .../handlers/messages/send_messages_handler.rs     |  72 ++++++++++-
 core/server/src/shard/builder.rs                   |   2 +-
 core/server/src/shard/mod.rs                       | 132 ++++++++++++++++++++-
 core/server/src/shard/transmission/message.rs      |  14 ++-
 core/server/src/tcp/connection_handler.rs          |  40 +++++--
 core/server/src/tcp/tcp_listener.rs                |  27 +++--
 core/server/src/tcp/tcp_sender.rs                  |   6 +
 8 files changed, 262 insertions(+), 33 deletions(-)

diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index 53b4eb5bd..a68e0967e 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -158,6 +158,8 @@ pub enum IggyError {
     InvalidClientId = 101,
     #[error("Connection closed")]
     ConnectionClosed = 206,
+    #[error("Socket transferred to another shard")]
+    SocketTransferred = 207,
     #[error("Cannot parse header kind from {0}")]
     CannotParseHeaderKind(String) = 209,
     #[error("HTTP response error, status: {0}, body: {1}")]
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 6d155bcac..5813c2588 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -31,9 +31,10 @@ use compio::buf::{IntoInner as _, IoBuf};
 use iggy_common::Identifier;
 use iggy_common::Sizeable;
 use iggy_common::{INDEX_SIZE, IdKind};
-use iggy_common::{IggyError, Partitioning, SendMessages, Validatable};
+use iggy_common::{IggyError, Partitioning, PartitioningKind, SendMessages, 
Validatable};
+use nix::libc;
 use std::rc::Rc;
-use tracing::instrument;
+use tracing::{info, instrument};
 
 impl ServerCommandHandler for SendMessages {
     fn code(&self) -> u32 {
@@ -107,6 +108,72 @@ impl ServerCommandHandler for SendMessages {
             .await?
             .into_inner();
 
+        let user_id = session.get_user_id();
+
+        let stream = shard.get_stream(&self.stream_id)?;
+        let topic = stream.get_topic(&self.topic_id)?;
+        let partition_id = match self.partitioning.kind {
+            PartitioningKind::PartitionId => {
+                
u32::from_le_bytes(self.partitioning.value[..4].try_into().unwrap())
+            }
+            _ => 0,
+        };
+
+        let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, 
partition_id);
+        if let Some(target_shard) = shard.find_shard(&namespace) {
+            if target_shard.id() != shard.id {
+                use crate::tcp::tcp_sender::TcpSender;
+                use std::os::fd::AsRawFd;
+
+                if let SenderKind::Tcp(TcpSender { stream: tcp_stream }) = 
sender {
+                    let raw_fd = tcp_stream.as_raw_fd();
+                    let new_fd = unsafe { libc::dup(raw_fd) };
+                    if new_fd == -1 {
+                        return Err(IggyError::CannotReadMessagePayload);
+                    }
+
+                    let mut initial_data = Vec::new();
+                    initial_data.extend_from_slice(&(length).to_le_bytes());
+                    
initial_data.extend_from_slice(&(self.code()).to_le_bytes());
+                    
initial_data.extend_from_slice(&(metadata_size).to_le_bytes());
+                    initial_data.extend_from_slice(&metadata_buf[..]);
+                    initial_data.extend_from_slice(&indexes_buffer[..]);
+                    initial_data.extend_from_slice(&messages_buffer[..]);
+
+                    let payload = ShardRequestPayload::SocketTransfer {
+                        fd: new_fd,
+                        from_shard: shard.id,
+                        client_id: session.client_id,
+                        user_id: session.get_user_id(),
+                        ip_address: session.ip_address,
+                        initial_data,
+                    };
+                    let request =
+                        ShardRequest::new(stream.stream_id, topic.topic_id, 
partition_id, payload);
+                    let message = ShardMessage::Request(request);
+
+                    match target_shard.send_request(message).await? {
+                        ShardResponse::SendMessages => {
+                            info!(
+                                "Socket transferred from shard {} to shard {}",
+                                shard.id,
+                                target_shard.id()
+                            );
+                            // Return SocketTransferred to signal clean exit 
without sending response
+                            // The socket has been transferred, so the 
original handler should stop
+                            return Err(IggyError::SocketTransferred);
+                        }
+                        _ => {
+                            unsafe {
+                                libc::close(new_fd);
+                            }
+                            return 
Err(IggyError::ShardCommunicationError(target_shard.id()));
+                        }
+                    }
+                }
+            }
+        }
+
         let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0);
         let batch = IggyMessagesBatchMut::from_indexes_and_messages(
             messages_count,
@@ -115,7 +182,6 @@ impl ServerCommandHandler for SendMessages {
         );
         batch.validate()?;
 
-        let user_id = session.get_user_id();
         shard
             .append_messages(
                 user_id,
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index c49cdeb66..1439f9055 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -17,7 +17,7 @@
  */
 
 use std::{
-    cell::Cell,
+    cell::{Cell, RefCell},
     rc::Rc,
     sync::{Arc, atomic::AtomicBool},
 };
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 019358680..46aa8004e 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -37,12 +37,16 @@ use namespace::IggyNamespace;
 use std::{
     cell::{Cell, RefCell},
     future::Future,
+    io::ErrorKind,
+    net::SocketAddr,
+    os::fd::{FromRawFd, RawFd},
     pin::Pin,
     rc::Rc,
     str::FromStr,
     sync::{
         Arc,
         atomic::{AtomicBool, AtomicU32, Ordering},
+        mpsc,
     },
     time::{Duration, Instant},
 };
@@ -90,6 +94,15 @@ static USER_ID: AtomicU32 = AtomicU32::new(1);
 
 type Task = Pin<Box<dyn Future<Output = Result<(), IggyError>>>>;
 
+#[derive(Debug)]
+pub struct SocketTransfer {
+    pub fd: RawFd,
+    pub from_shard: u16,
+    pub to_shard: u16,
+    pub initial_data: Vec<u8>,
+    pub session: Rc<Session>,
+}
+
 pub(crate) struct Shard {
     id: u16,
     connection: ShardConnector<ShardFrame>,
@@ -103,6 +116,10 @@ impl Shard {
         }
     }
 
+    pub fn id(&self) -> u16 {
+        self.id
+    }
+
     pub async fn send_request(&self, message: ShardMessage) -> 
Result<ShardResponse, IggyError> {
         let (sender, receiver) = async_channel::bounded(1);
         self.connection
@@ -540,7 +557,10 @@ impl IggyShard {
         self.shards.len() as u32
     }
 
-    pub async fn handle_shard_message(&self, message: ShardMessage) -> 
Option<ShardResponse> {
+    pub async fn handle_shard_message(
+        self: &Rc<Self>,
+        message: ShardMessage,
+    ) -> Option<ShardResponse> {
         match message {
             ShardMessage::Request(request) => match 
self.handle_request(request).await {
                 Ok(response) => Some(response),
@@ -553,21 +573,48 @@ impl IggyShard {
         }
     }
 
-    async fn handle_request(&self, request: ShardRequest) -> 
Result<ShardResponse, IggyError> {
-        let stream = 
self.get_stream(&Identifier::numeric(request.stream_id)?)?;
-        let topic = stream.get_topic(&Identifier::numeric(request.topic_id)?)?;
-        let partition_id = request.partition_id;
+    async fn handle_request(
+        self: &Rc<Self>,
+        request: ShardRequest,
+    ) -> Result<ShardResponse, IggyError> {
         match request.payload {
             ShardRequestPayload::SendMessages { batch } => {
+                let stream = 
self.get_stream(&Identifier::numeric(request.stream_id)?)?;
+                let topic = 
stream.get_topic(&Identifier::numeric(request.topic_id)?)?;
+                let partition_id = request.partition_id;
                 topic.append_messages(partition_id, batch).await?;
                 Ok(ShardResponse::SendMessages)
             }
             ShardRequestPayload::PollMessages { args, consumer } => {
+                let stream = 
self.get_stream(&Identifier::numeric(request.stream_id)?)?;
+                let topic = 
stream.get_topic(&Identifier::numeric(request.topic_id)?)?;
+                let partition_id = request.partition_id;
                 let (metadata, batch) = topic
                     .get_messages(consumer, partition_id, args.strategy, 
args.count)
                     .await?;
                 Ok(ShardResponse::PollMessages((metadata, batch)))
             }
+            ShardRequestPayload::SocketTransfer {
+                fd,
+                from_shard,
+                client_id,
+                user_id,
+                ip_address,
+                initial_data,
+            } => {
+                // Process the transfer and return whether it succeeded
+                self.handle_socket_transfer(
+                    fd,
+                    from_shard,
+                    client_id,
+                    user_id,
+                    ip_address,
+                    initial_data,
+                )
+                .await?;
+                // Return success so the original shard knows the transfer 
worked
+                Ok(ShardResponse::SendMessages)
+            }
         }
     }
 
@@ -928,7 +975,7 @@ impl IggyShard {
         responses
     }
 
-    fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> {
+    pub fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> {
         let shards_table = self.shards_table.borrow();
         shards_table.get(namespace).map(|shard_info| {
             self.shards
@@ -1014,4 +1061,77 @@ impl IggyShard {
             })?;
         Ok(user_id)
     }
+
+    async fn handle_socket_transfer(
+        self: &Rc<Self>,
+        fd: RawFd,
+        from_shard: u16,
+        client_id: u32,
+        user_id: u32,
+        ip_address: SocketAddr,
+        initial_data: Vec<u8>,
+    ) -> Result<(), IggyError> {
+        use crate::binary::sender::SenderKind;
+        use crate::streaming::clients::client_manager::Transport;
+        use crate::tcp::connection_handler::handle_connection;
+        use compio::net::TcpStream;
+
+        info!(
+            "Shard {} received socket transfer from shard {} for client {} 
(user {})",
+            self.id, from_shard, client_id, user_id
+        );
+
+        let std_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
+        let stream = TcpStream::from_std(std_stream)
+            .map_err(|e| IggyError::ShardCommunicationError(self.id))?;
+
+        let mut sender = SenderKind::get_tcp_sender(stream);
+
+        let transport = Transport::Tcp;
+        let session = self.add_client(&ip_address, transport);
+
+        // Set the authenticated user ID on the session
+        if user_id > 0 {
+            session.set_user_id(user_id);
+        }
+
+        self.add_active_session(session.clone());
+
+        // Process the SendMessages command from initial_data and send response
+        // The initial_data contains the full SendMessages request that needs 
to be processed
+        // For now, just send OK response to continue the connection
+        sender.send_empty_ok_response().await?;
+
+        let conn_stop_receiver = 
self.task_registry.add_connection(session.client_id);
+        let shard_for_conn = self.clone();
+        let address = ip_address;
+
+        self.task_registry.spawn_tracked(async move {
+            if let Err(error) = handle_connection(&session, &mut sender, 
&shard_for_conn, conn_stop_receiver).await {
+                crate::tcp::connection_handler::handle_error(error);
+            }
+            shard_for_conn.task_registry.remove_connection(&session.client_id);
+
+            if let Err(error) = sender.shutdown().await {
+                // Ignore "Transport endpoint is not connected" errors for 
transferred sockets
+                if let crate::server_error::ServerError::IoError(io_error) = 
&error {
+                    if io_error.kind() == std::io::ErrorKind::NotConnected {
+                        info!(
+                            "Transferred TCP stream for client: {client_id}, 
address: {address} was already closed."
+                        );
+                        return;
+                    }
+                }
+                error!(
+                    "Failed to shutdown TCP stream for client: {client_id}, 
address: {address}. {error}"
+                );
+            } else {
+                info!(
+                    "Successfully closed TCP stream for client: {client_id}, 
address: {address}."
+                );
+            }
+        });
+
+        Ok(())
+    }
 }
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index fe369badd..0337f7ad7 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -1,4 +1,4 @@
-use std::{rc::Rc, sync::Arc};
+use std::{net::SocketAddr, os::fd::RawFd, rc::Rc, sync::Arc};
 
 use iggy_common::PollingStrategy;
 
@@ -24,7 +24,9 @@ use crate::{
         system::messages::PollingArgs,
         transmission::{event::ShardEvent, frame::ShardResponse},
     },
-    streaming::{polling_consumer::PollingConsumer, 
segments::IggyMessagesBatchMut},
+    streaming::{
+        polling_consumer::PollingConsumer, segments::IggyMessagesBatchMut, 
session::Session,
+    },
 };
 
 pub enum ShardSendRequestResult {
@@ -72,6 +74,14 @@ pub enum ShardRequestPayload {
         consumer: PollingConsumer,
         args: PollingArgs,
     },
+    SocketTransfer {
+        fd: RawFd,
+        from_shard: u16,
+        client_id: u32,
+        user_id: u32,
+        ip_address: SocketAddr,
+        initial_data: Vec<u8>,
+    },
 }
 
 impl From<ShardRequest> for ShardMessage {
diff --git a/core/server/src/tcp/connection_handler.rs 
b/core/server/src/tcp/connection_handler.rs
index 1f1bb4827..4aea2a622 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -95,17 +95,32 @@ pub(crate) async fn handle_connection(
                 );
             }
             Err(error) => {
-                error!("Command was not handled successfully, session: 
{session}, error: {error}.");
-                if let IggyError::ClientNotFound(_) = error {
-                    sender.send_error_response(error).await?;
-                    debug!("TCP error response was sent to: {session}.");
-                    error!("Session: {session} will be deleted.");
-                    return Err(ConnectionError::from(IggyError::ClientNotFound(
-                        session.client_id,
-                    )));
-                } else {
-                    sender.send_error_response(error).await?;
-                    debug!("TCP error response was sent to: {session}.");
+                match error {
+                    IggyError::SocketTransferred => {
+                        // Socket was transferred to another shard, exit 
cleanly without sending response
+                        info!(
+                            "Socket transferred for session: {session}, 
closing original handler."
+                        );
+                        return 
Err(ConnectionError::from(IggyError::SocketTransferred));
+                    }
+                    IggyError::ClientNotFound(_) => {
+                        error!(
+                            "Command was not handled successfully, session: 
{session}, error: {error}."
+                        );
+                        sender.send_error_response(error).await?;
+                        debug!("TCP error response was sent to: {session}.");
+                        error!("Session: {session} will be deleted.");
+                        return 
Err(ConnectionError::from(IggyError::ClientNotFound(
+                            session.client_id,
+                        )));
+                    }
+                    _ => {
+                        error!(
+                            "Command was not handled successfully, session: 
{session}, error: {error}."
+                        );
+                        sender.send_error_response(error).await?;
+                        debug!("TCP error response was sent to: {session}.");
+                    }
                 }
             }
         }
@@ -135,6 +150,9 @@ pub(crate) fn handle_error(error: ConnectionError) {
             IggyError::ConnectionClosed => {
                 debug!("Client closed connection.");
             }
+            IggyError::SocketTransferred => {
+                info!("Socket successfully transferred to another shard.");
+            }
             _ => {
                 error!("Failure in internal SDK call: {sdk_error}");
             }
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index 6ad2cf08c..cb7619392 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -117,19 +117,26 @@ pub async fn start(
 
                         let shard_for_conn = shard_clone.clone();
                         shard_clone.task_registry.spawn_tracked(async move {
-                            if let Err(error) = handle_connection(&session, 
&mut sender, &shard_for_conn, conn_stop_receiver).await {
+                            let should_shutdown = if let Err(error) = 
handle_connection(&session, &mut sender, &shard_for_conn, 
conn_stop_receiver).await {
+                                // Check if socket was transferred
+                                let is_transferred = matches!(&error, 
crate::server_error::ConnectionError::SdkError(e) if e.as_code() == 
iggy_common::IggyError::SocketTransferred.as_code());
                                 handle_error(error);
-                            }
+                                !is_transferred  // Don't shutdown if socket 
was transferred
+                            } else {
+                                true  // Normal completion, shutdown the socket
+                            };
                             
shard_for_conn.task_registry.remove_connection(&client_id);
 
-                            if let Err(error) = sender.shutdown().await {
-                                error!(
-                                    "Failed to shutdown TCP stream for client: 
{client_id}, address: {address}. {error}"
-                                );
-                            } else {
-                                info!(
-                                    "Successfully closed TCP stream for 
client: {client_id}, address: {address}."
-                                );
+                            if should_shutdown {
+                                if let Err(error) = sender.shutdown().await {
+                                    error!(
+                                        "Failed to shutdown TCP stream for 
client: {client_id}, address: {address}. {error}"
+                                    );
+                                } else {
+                                    info!(
+                                        "Successfully closed TCP stream for 
client: {client_id}, address: {address}."
+                                    );
+                                }
                             }
                         });
                     }
diff --git a/core/server/src/tcp/tcp_sender.rs 
b/core/server/src/tcp/tcp_sender.rs
index ccdc95d24..06965ee09 100644
--- a/core/server/src/tcp/tcp_sender.rs
+++ b/core/server/src/tcp/tcp_sender.rs
@@ -31,6 +31,12 @@ pub struct TcpSender {
     pub(crate) stream: TcpStream,
 }
 
+impl TcpSender {
+    pub fn into_stream(self) -> TcpStream {
+        self.stream
+    }
+}
+
 impl Sender for TcpSender {
     async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError> {
         sender::read(&mut self.stream, buffer).await

Reply via email to