This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch rebase_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit e1e31ea824212b57e780723209eaf0f80dfbf5f5
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Tue Jul 1 11:14:39 2025 +0200

    feat(io_uring): fix tcp tls server (#1952)
    
    Closes #1936
---
 Cargo.lock                              |  56 ++----------
 core/server/Cargo.toml                  |   2 +-
 core/server/src/binary/sender.rs        |   5 +-
 core/server/src/tcp/tcp_listener.rs     |  14 +--
 core/server/src/tcp/tcp_server.rs       |  16 +++-
 core/server/src/tcp/tcp_tls_listener.rs | 153 +++++++++++++++++---------------
 core/server/src/tcp/tcp_tls_sender.rs   |   5 +-
 7 files changed, 110 insertions(+), 141 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 6898bb77..ca282813 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1686,16 +1686,6 @@ dependencies = [
  "version_check",
 ]
 
-[[package]]
-name = "core-foundation"
-version = "0.9.4"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f"
-dependencies = [
- "core-foundation-sys",
- "libc",
-]
-
 [[package]]
 name = "core-foundation"
 version = "0.10.1"
@@ -4921,15 +4911,15 @@ dependencies = [
 ]
 
 [[package]]
-name = "monoio-native-tls"
+name = "monoio-rustls"
 version = "0.4.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b9022f5aaa19f9688f97bfcfa0c4a4318d424851995badb356674ca742652cdb"
+checksum = "6e31f422825bd7fb19957af6eaf89d7234ba143fcc0e515f5a2f526e332d1875"
 dependencies = [
  "bytes",
  "monoio",
  "monoio-io-wrapper",
- "native-tls",
+ "rustls",
  "thiserror 1.0.69",
 ]
 
@@ -4942,23 +4932,6 @@ dependencies = [
  "getrandom 0.2.16",
 ]
 
-[[package]]
-name = "native-tls"
-version = "0.2.14"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e"
-dependencies = [
- "libc",
- "log",
- "openssl",
- "openssl-probe",
- "openssl-sys",
- "schannel",
- "security-framework 2.11.1",
- "security-framework-sys",
- "tempfile",
-]
-
 [[package]]
 name = "never-say-never"
 version = "6.6.666"
@@ -6631,7 +6604,7 @@ dependencies = [
  "openssl-probe",
  "rustls-pki-types",
  "schannel",
- "security-framework 3.2.0",
+ "security-framework",
 ]
 
 [[package]]
@@ -6659,7 +6632,7 @@ version = "0.5.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "19787cda76408ec5404443dc8b31795c87cd8fec49762dc75fa727740d34acc1"
 dependencies = [
- "core-foundation 0.10.1",
+ "core-foundation",
  "core-foundation-sys",
  "jni",
  "log",
@@ -6668,7 +6641,7 @@ dependencies = [
  "rustls-native-certs",
  "rustls-platform-verifier-android",
  "rustls-webpki",
- "security-framework 3.2.0",
+ "security-framework",
  "security-framework-sys",
  "webpki-root-certs 0.26.11",
  "windows-sys 0.59.0",
@@ -6816,19 +6789,6 @@ dependencies = [
  "zeroize",
 ]
 
-[[package]]
-name = "security-framework"
-version = "2.11.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
-dependencies = [
- "bitflags 2.9.1",
- "core-foundation 0.9.4",
- "core-foundation-sys",
- "libc",
- "security-framework-sys",
-]
-
 [[package]]
 name = "security-framework"
 version = "3.2.0"
@@ -6836,7 +6796,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316"
 dependencies = [
  "bitflags 2.9.1",
- "core-foundation 0.10.1",
+ "core-foundation",
  "core-foundation-sys",
  "libc",
  "security-framework-sys",
@@ -7054,7 +7014,7 @@ dependencies = [
  "mockall",
  "moka",
  "monoio",
- "monoio-native-tls",
+ "monoio-rustls",
  "nix 0.30.1",
  "once_cell",
  "opentelemetry",
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 3e6c801e..4b330383 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -68,7 +68,7 @@ hash32 = "1.0.0"
 mimalloc = { workspace = true, optional = true }
 moka = { version = "0.12.10", features = ["future"] }
 monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat", "renameat", 
"sync"] }
-monoio-native-tls = "0.4.0"
+monoio-rustls = "0.4.0"
 nix = { version = "0.30", features = ["fs"] }
 once_cell = "1.21.3"
 opentelemetry = { version = "0.30.0", features = ["trace", "logs"] }
diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs
index adbf05ee..45066c75 100644
--- a/core/server/src/binary/sender.rs
+++ b/core/server/src/binary/sender.rs
@@ -25,10 +25,11 @@ use crate::{quic::quic_sender::QuicSender, 
server_error::ServerError};
 use bytes::BytesMut;
 use iggy_common::IggyError;
 use monoio::buf::IoBufMut;
+use monoio::io::{AsyncReadRent, AsyncWriteRent};
 use monoio::net::TcpStream;
-use monoio_native_tls::TlsStream;
 use nix::libc;
 use quinn::{RecvStream, SendStream};
+use monoio_rustls::{ServerTlsStream, TlsStream};
 
 macro_rules! forward_async_methods {
     (
@@ -84,7 +85,7 @@ impl SenderKind {
         Self::Tcp(TcpSender { stream })
     }
 
-    pub fn get_tcp_tls_sender(stream: TlsStream<TcpStream>) -> Self {
+    pub fn get_tcp_tls_sender(stream: ServerTlsStream<TcpStream>) -> Self {
         Self::TcpTls(TcpTlsSender { stream })
     }
 
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index 90b1eabb..4f528016 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -21,25 +21,15 @@ use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::clients::client_manager::Transport;
 use crate::tcp::connection_handler::{handle_connection, handle_error};
-use crate::tcp::tcp_socket;
 use futures::FutureExt;
 use iggy_common::IggyError;
+use socket2::Socket;
 use std::net::SocketAddr;
 use std::rc::Rc;
 use std::time::Duration;
 use tracing::{error, info};
 
-pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
-    let ip_v6 = shard.config.tcp.ipv6;
-    let socket_config = &shard.config.tcp.socket;
-    let addr: SocketAddr = shard
-        .config
-        .tcp
-        .address
-        .parse()
-        .expect("Failed to parse TCP address");
-
-    let socket = tcp_socket::build(ip_v6, socket_config);
+pub async fn start(server_name: &'static str, addr: SocketAddr, socket: 
Socket, shard: Rc<IggyShard>) -> Result<(), IggyError> {
     socket
         .bind(&addr.into())
         .expect("Failed to bind TCP listener");
diff --git a/core/server/src/tcp/tcp_server.rs 
b/core/server/src/tcp/tcp_server.rs
index e558d08f..ac019bb0 100644
--- a/core/server/src/tcp/tcp_server.rs
+++ b/core/server/src/tcp/tcp_server.rs
@@ -17,8 +17,9 @@
  */
 
 use crate::shard::IggyShard;
-use crate::tcp::tcp_listener;
+use crate::tcp::{tcp_listener, tcp_socket, tcp_tls_listener};
 use iggy_common::IggyError;
+use std::net::SocketAddr;
 use std::rc::Rc;
 use tracing::info;
 
@@ -30,11 +31,20 @@ pub async fn spawn_tcp_server(shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
     } else {
         "Iggy TCP"
     };
+    let ip_v6 = shard.config.tcp.ipv6;
+    let socket_config = &shard.config.tcp.socket;
+    let addr: SocketAddr = shard
+        .config
+        .tcp
+        .address
+        .parse()
+        .expect("Failed to parse TCP address");
+    let socket = tcp_socket::build(ip_v6, socket_config);
     info!("Initializing {server_name} server...");
     // TODO: Fixme -- storing addr of the server inside of the config for 
integration tests...
     match shard.config.tcp.tls.enabled {
-        true => unimplemented!("TLS support is not implemented yet"),
-        false => tcp_listener::start(server_name, shard.clone()).await?,
+        true => tcp_tls_listener::start(server_name, addr, socket, 
shard.clone()).await?,
+        false => tcp_listener::start(server_name, addr, socket, 
shard.clone()).await?,
     };
 
     Ok(())
diff --git a/core/server/src/tcp/tcp_tls_listener.rs 
b/core/server/src/tcp/tcp_tls_listener.rs
index 53df4b0b..2c81eb60 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -19,33 +19,31 @@
 use crate::binary::sender::SenderKind;
 use crate::configs::tcp::TcpTlsConfig;
 use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::clients::client_manager::Transport;
 use crate::tcp::connection_handler::{handle_connection, handle_error};
-use monoio_native_tls::TlsAcceptor;
+use futures::FutureExt;
+use iggy_common::IggyError;
+use monoio_rustls::TlsAcceptor;
 use rustls::ServerConfig;
 use rustls::pki_types::{CertificateDer, PrivateKeyDer};
 use rustls_pemfile::{certs, private_key};
+use socket2::Socket;
 use std::io::BufReader;
 use std::net::SocketAddr;
 use std::rc::Rc;
 use std::sync::Arc;
-use tokio::net::TcpSocket;
-use tokio::sync::oneshot;
+use std::time::Duration;
 use tracing::{error, info};
 
 pub(crate) async fn start(
-    address: &str,
-    config: TcpTlsConfig,
-    socket: TcpSocket,
-    system: Rc<IggyShard>,
-) -> SocketAddr {
-    //TODO: Fixme
-    todo!();
-    /*
-    let address = address.to_string();
-    let (tx, rx) = oneshot::channel();
-    tokio::spawn(async move {
+    server_name: &'static str,
+    addr: SocketAddr,
+    socket: Socket,
+    shard: Rc<IggyShard>,
+) -> Result<(), IggyError> {
         let _ = 
rustls::crypto::aws_lc_rs::default_provider().install_default();
+        let config = &shard.config.tcp.tls;
 
         let (certs, key) =
             if config.self_signed && 
!std::path::Path::new(&config.cert_file).exists() {
@@ -64,78 +62,87 @@ pub(crate) async fn start(
 
         let acceptor = TlsAcceptor::from(Arc::new(server_config));
 
-        let addr = address.parse();
-        if addr.is_err() {
-            panic!("Unable to parse address {address:?}");
-        }
-
-        let addr = addr.unwrap();
         socket
-            .bind(addr)
+            .bind(&addr.into())
             .unwrap_or_else(|e| panic!("Unable to bind socket to address 
'{addr}': {e}",));
 
-        let listener = socket
-            .listen(1024)
-            .unwrap_or_else(|e| panic!("Unable to start TCP TLS server on 
'{address}': {e}",));
-
-        let local_addr = listener
-            .local_addr()
-            .unwrap_or_else(|e| panic!("Failed to get local address for TCP 
TLS listener: {e}",));
-
-        tx.send(local_addr).unwrap_or_else(|_| {
-            panic!("Failed to send the local address '{local_addr}' for TCP 
TLS listener")
-        });
+        let listener: std::net::TcpListener = socket.into();
+        let listener = monoio::net::TcpListener::from_std(listener).unwrap();
+        info!("{server_name} server has started on: {:?}", addr);
 
         loop {
-            match listener.accept().await {
-                Ok((stream, address)) => {
-                    info!("Accepted new TCP TLS connection: {}", address);
-                    let session = system
-                        .read()
-                        .await
-                        .add_client(&address, Transport::Tcp)
-                        .await;
-
-                    let client_id = session.client_id;
-                    let acceptor = acceptor.clone();
-                    let system_clone = system.clone();
-                    match acceptor.accept(stream).await {
-                        Ok(stream) => {
-                            let mut sender = 
SenderKind::get_tcp_tls_sender(stream);
-                            tokio::spawn(async move {
-                                if let Err(error) =
-                                    handle_connection(session, &mut sender, 
system_clone.clone())
-                                        .await
-                                {
-                                    handle_error(error);
-                                    
system_clone.read().await.delete_client(client_id).await;
-                                    if let Err(error) = 
sender.shutdown().await {
-                                        error!(
-                                            "Failed to shutdown TCP stream for 
client: {client_id}, address: {address}. {error}"
-                                        );
-                                    } else {
-                                        info!(
-                                            "Successfully closed TCP stream 
for client: {client_id}, address: {address}."
-                                        );
+            let shutdown_check = async {
+                loop {
+                    if shard.is_shutting_down() {
+                        return;
+                    }
+                    monoio::time::sleep(Duration::from_millis(100)).await;
+                }
+            };
+
+            let accept_future = listener.accept();
+            futures::select! {
+                _ = shutdown_check.fuse() => {
+                    info!("TCP TLS server detected shutdown flag, no longer 
accepting connections");
+                    break;
+                }
+                result = accept_future.fuse() => {
+                    match result {
+                        Ok((stream, address)) => {
+                            if shard.is_shutting_down() {
+                                info!("Rejecting new TLS connection from {} 
during shutdown", address);
+                                continue;
+                            }
+                            let shard_clone = shard.clone();
+                            info!("Accepted new TCP TLS connection: {}", 
address);
+                            let transport = Transport::Tcp;
+                            let session = shard_clone.add_client(&address, 
transport);
+                            //TODO: Those can be shared with other shards.
+                            shard_clone.add_active_session(session.clone());
+                            // Broadcast session to all shards.
+                            let event = ShardEvent::NewSession { address, 
transport };
+                            // TODO: Fixme look inside of 
broadcast_event_to_all_shards method.
+                            let _responses = 
shard_clone.broadcast_event_to_all_shards(event.into());
+
+                            let client_id = session.client_id;
+                            info!("Created new session: {session}");
+                            let acceptor = acceptor.clone();
+                            
+                            let conn_stop_receiver = 
shard_clone.task_registry.add_connection(client_id);
+
+                            let shard_for_conn = shard_clone.clone();
+                            shard_clone.task_registry.spawn_tracked(async move 
{
+                                match acceptor.accept(stream).await {
+                                    Ok(tls_stream) => {
+                                        let mut sender = 
SenderKind::get_tcp_tls_sender(tls_stream.into());
+                                        if let Err(error) = 
handle_connection(&session, &mut sender, &shard_for_conn, 
conn_stop_receiver).await {
+                                            handle_error(error);
+                                        }
+                                        
shard_for_conn.task_registry.remove_connection(&client_id);
+
+                                        if let Err(error) = 
sender.shutdown().await {
+                                            error!(
+                                                "Failed to shutdown TCP TLS 
stream for client: {client_id}, address: {address}. {error}"
+                                            );
+                                        } else {
+                                            info!(
+                                                "Successfully closed TCP TLS 
stream for client: {client_id}, address: {address}."
+                                            );
+                                        }
+                                    }
+                                    Err(e) => {
+                                        error!("Failed to accept TLS 
connection from '{address}': {e}");
+                                        
shard_for_conn.task_registry.remove_connection(&client_id);
                                     }
                                 }
                             });
                         }
-                        Err(e) => {
-                            error!("Failed to accept TLS connection from 
'{address}': {e}");
-                            
system_clone.read().await.delete_client(client_id).await;
-                        }
+                        Err(error) => error!("Unable to accept TCP TLS socket. 
{error}"),
                     }
                 }
-                Err(error) => error!("Unable to accept TCP TLS socket. 
{error}"),
             }
         }
-    });
-    match rx.await {
-        Ok(addr) => addr,
-        Err(_) => panic!("Failed to get the local address for TCP TLS 
listener."),
-    }
-    */
+    Ok(())
 }
 
 fn generate_self_signed_cert()
diff --git a/core/server/src/tcp/tcp_tls_sender.rs 
b/core/server/src/tcp/tcp_tls_sender.rs
index 167427c1..b214e5a6 100644
--- a/core/server/src/tcp/tcp_tls_sender.rs
+++ b/core/server/src/tcp/tcp_tls_sender.rs
@@ -25,12 +25,13 @@ use iggy_common::IggyError;
 use monoio::buf::IoBufMut;
 use monoio::io::AsyncWriteRent;
 use monoio::net::TcpStream;
-use monoio_native_tls::TlsStream;
+use monoio_rustls::ServerTlsStream;
+//use tokio_rustls::server::TlsStream;
 use nix::libc;
 
 #[derive(Debug)]
 pub struct TcpTlsSender {
-    pub(crate) stream: TlsStream<TcpStream>,
+    pub(crate) stream: ServerTlsStream<TcpStream>,
 }
 
 impl Sender for TcpTlsSender {

Reply via email to