This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch add-sync-client in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 2ed7f93e0f7e0040f7904fd307028fecdc801c76 Author: haze518 <[email protected]> AuthorDate: Sun Sep 14 14:10:24 2025 +0600 del --- core/common/src/error/iggy_error.rs | 6 + core/sdk/src/connection/mod.rs | 1 + core/sdk/src/connection/transport.rs | 62 ++++++++ core/sdk/src/lib.rs | 2 + core/sdk/src/protocol/core.rs | 280 +++++++++++++++++++++++++++++++++++ core/sdk/src/protocol/mod.rs | 1 + core/sdk/src/tcp/mod.rs | 2 + core/sdk/src/tcp/tcp_client_sync.rs | 181 ++++++++++++++++++++++ 8 files changed, 535 insertions(+) diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index cc8a284b..2597e154 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -459,6 +459,12 @@ pub enum IggyError { CannotReadIndexPosition = 10011, #[error("Cannot read index timestamp")] CannotReadIndexTimestamp = 10012, + #[error("Max number of retry has exceeded")] + MaxRetriesExceeded = 10050, + #[error("Connection missed socket")] + ConnectionMissedSocket = 10051, + #[error("Incorrect connection state")] + IncorrectConnectionState = 10052, } impl IggyError { diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs new file mode 100644 index 00000000..3feff26d --- /dev/null +++ b/core/sdk/src/connection/mod.rs @@ -0,0 +1 @@ +pub(crate) mod transport; diff --git a/core/sdk/src/connection/transport.rs b/core/sdk/src/connection/transport.rs new file mode 100644 index 00000000..cd4a8071 --- /dev/null +++ b/core/sdk/src/connection/transport.rs @@ -0,0 +1,62 @@ +use std::io::{self, Read, Write}; +use std::net::{SocketAddr, TcpStream}; +use std::str::FromStr; +use std::sync::Arc; + +use iggy_common::{AutoLogin, IggyDuration, TcpClientConfig}; + +pub trait ClientConfig { + fn server_address(&self) -> SocketAddr; + fn auto_login(&self) -> AutoLogin; + fn reconnection_reestablish_after(&self) -> IggyDuration; + fn reconnection_max_retries(&self) -> Option<u32>; + fn heartbeat_interval(&self) -> IggyDuration; +} + +impl ClientConfig for TcpClientConfig { + fn auto_login(&self) -> AutoLogin { + self.auto_login.clone() + } + + fn heartbeat_interval(&self) -> IggyDuration { + self.heartbeat_interval + } + + fn reconnection_max_retries(&self) -> Option<u32> { + self.reconnection.max_retries + } + + fn reconnection_reestablish_after(&self) -> IggyDuration { + self.reconnection.reestablish_after + } + + fn server_address(&self) -> SocketAddr { + SocketAddr::from_str(&self.server_address).unwrap() + } +} + +pub trait Transport { + type Stream: Read + Write; + type Config: ClientConfig; + + fn connect( + cfg: Arc<Self::Config>, // TODO maybe should remove arc + server_address: SocketAddr, + ) -> io::Result<Self::Stream>; +} + +pub struct TcpTransport; + +impl Transport for TcpTransport { + type Stream = TcpStream; + type Config = TcpClientConfig; + + fn connect( + cfg: Arc<Self::Config>, // TODO maybe should remove arc + server_address: SocketAddr, + ) -> io::Result<Self::Stream> { + let stream = TcpStream::connect(server_address)?; + stream.set_nodelay(cfg.nodelay)?; + Ok(stream) + } +} diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs index c315fd5e..52e7e118 100644 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@ -26,3 +26,5 @@ pub mod prelude; pub mod quic; pub mod stream_builder; pub mod tcp; +pub mod protocol; +pub mod connection; diff --git a/core/sdk/src/protocol/core.rs b/core/sdk/src/protocol/core.rs new file mode 100644 index 00000000..95a8f697 --- /dev/null +++ b/core/sdk/src/protocol/core.rs @@ -0,0 +1,280 @@ +use std::{ + collections::VecDeque, + net::SocketAddr, +}; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use iggy_common::{AutoLogin, ClientState, Credentials, IggyDuration, IggyError, IggyTimestamp}; +use tracing::{debug, info, warn}; + +const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; +const REQUEST_HEADER_BYTES: usize = 8; +const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; +#[derive(Debug)] +pub struct ProtocolCoreConfig { + pub auto_login: AutoLogin, + pub reestablish_after: IggyDuration, + pub max_retries: Option<u32>, +} + +#[derive(Debug)] +pub enum ControlAction { + Connect(SocketAddr), + Wait(IggyDuration), + Authenticate { username: String, password: String }, + Noop, + Error(IggyError), +} + +pub struct TxBuf { + pub header: [u8; 8], + pub payload: Bytes, + pub request_id: u64, +} + +impl TxBuf { + #[inline] + pub fn total_len(&self) -> usize { + REQUEST_HEADER_BYTES + self.payload.len() + } +} + +#[derive(Debug)] +pub struct ProtocolCore { + pub state: ClientState, + config: ProtocolCoreConfig, + last_connect_attempt: Option<IggyTimestamp>, + pub retry_count: u32, + next_request_id: u64, + pending_sends: VecDeque<(u32, Bytes, u64)>, + sent_order: VecDeque<u64>, + auth_pending: bool, + auth_request_id: Option<u64>, + server_address: Option<SocketAddr>, + last_auth_result: Option<Result<(), IggyError>>, +} + +impl ProtocolCore { + pub fn new(config: ProtocolCoreConfig) -> Self { + Self { + state: ClientState::Disconnected, + config, + last_connect_attempt: None, + retry_count: 0, + next_request_id: 1, + pending_sends: VecDeque::new(), + sent_order: VecDeque::new(), + auth_pending: false, + auth_request_id: None, + server_address: None, + last_auth_result: None, + } + } + + pub fn poll_transmit(&mut self) -> Option<TxBuf> { + if let Some((code, payload, request_id)) = self.pending_sends.pop_front() { + let total_len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32; + self.sent_order.push_back(request_id); + + Some(TxBuf { + payload, + header: make_header(total_len, code), + request_id, + }) + } else { + None + } + } + + pub fn send(&mut self, code: u32, payload: Bytes) -> Result<u64, IggyError> { + match self.state { + ClientState::Shutdown => Err(IggyError::ClientShutdown), + ClientState::Disconnected | ClientState::Connecting => Err(IggyError::NotConnected), + ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => { + Ok(self.queue_send(code, payload)) + } + } + } + + fn queue_send(&mut self, code: u32, payload: Bytes) -> u64 { + let request_id = self.next_request_id; + self.next_request_id += 1; + self.pending_sends.push_back((code, payload, request_id)); + request_id + } + + pub fn process_incoming_with<F: FnMut(u64, u32, Bytes)>( + &mut self, + buf: &mut BytesMut, + mut f: F, + ) { + loop { + if buf.len() < RESPONSE_INITIAL_BYTES_LENGTH { + break; + } + let status = u32::from_le_bytes(buf[..4].try_into().unwrap()); + let length = u32::from_le_bytes(buf[4..8].try_into().unwrap()); + let total = RESPONSE_INITIAL_BYTES_LENGTH + length as usize; + if buf.len() < total { + break; + } + + buf.advance(RESPONSE_INITIAL_BYTES_LENGTH); + let payload = if length <= 1 { + Bytes::new() + } else { + buf.split_to(length as usize).freeze() + }; + if let Some(id) = self.on_response(status) { + f(id, status, payload); + } + } + } + + pub fn on_response(&mut self, status: u32) -> Option<u64> { + let request_id = self.sent_order.pop_front()?; + + if Some(request_id) == self.auth_request_id { + if status == 0 { + debug!("Authentication successful"); + self.state = ClientState::Authenticated; + self.auth_pending = false; + self.last_auth_result = Some(Ok(())); + } else { + warn!("Authentication failed with status: {}", status); + self.state = ClientState::Connected; + self.auth_pending = false; + self.last_auth_result = Some(Err(IggyError::Unauthenticated)); + } + self.auth_request_id = None; + } + + Some(request_id) + } + + pub fn poll(&mut self) -> ControlAction { + match self.state { + ClientState::Shutdown => ControlAction::Error(IggyError::ClientShutdown), + ClientState::Disconnected => ControlAction::Noop, + ClientState::Authenticated | ClientState::Authenticating | ClientState::Connected => { + ControlAction::Noop + } + ClientState::Connecting => { + let server_address = match self.server_address { + Some(addr) => addr, + None => return ControlAction::Error(IggyError::ConnectionMissedSocket), + }; + + if let Some(last) = self.last_connect_attempt { + let now = IggyTimestamp::now(); + let elapsed = now.as_micros().saturating_sub(last.as_micros()); + if elapsed < self.config.reestablish_after.as_micros() { + let remaining = + IggyDuration::from(self.config.reestablish_after.as_micros() - elapsed); + return ControlAction::Wait(remaining); + } + } + + if let Some(max_retries) = self.config.max_retries { + if self.retry_count >= max_retries { + return ControlAction::Error(IggyError::MaxRetriesExceeded); + } + } + + self.retry_count += 1; + self.last_connect_attempt = Some(IggyTimestamp::now()); + + return ControlAction::Connect(server_address); + } + } + } + + pub fn desire_connect(&mut self, server_address: SocketAddr) -> Result<(), IggyError> { + match self.state { + ClientState::Shutdown => return Err(IggyError::ClientShutdown), + ClientState::Connecting => return Ok(()), + ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => { + return Ok(()); + } + _ => { + self.state = ClientState::Connecting; + self.server_address = Some(server_address); + } + } + + Ok(()) + } + + pub fn on_connected(&mut self) -> Result<(), IggyError> { + debug!("Transport connected"); + if self.state != ClientState::Connecting { + return Err(IggyError::IncorrectConnectionState); + } + self.state = ClientState::Connected; + self.retry_count = 0; + + match &self.config.auto_login { + AutoLogin::Disabled => { + info!("Automatic sign-in is disabled."); + } + AutoLogin::Enabled(credentials) => { + if !self.auth_pending { + self.state = ClientState::Authenticating; + self.auth_pending = true; + + match credentials { + Credentials::UsernamePassword(username, password) => { + let auth_payload = encode_auth(&username, &password); + let auth_id = self.queue_send(0x0A, auth_payload); + self.auth_request_id = Some(auth_id); + } + _ => { + todo!("add PersonalAccessToken") + } + } + } + } + } + + Ok(()) + } + + pub fn disconnect(&mut self) { + debug!("Transport disconnected"); + self.state = ClientState::Disconnected; + self.auth_pending = false; + self.auth_request_id = None; + self.sent_order.clear(); + } + + pub fn shutdown(&mut self) { + self.state = ClientState::Shutdown; + self.auth_pending = false; + self.auth_request_id = None; + self.sent_order.clear(); + } + + pub fn should_wait_auth(&self) -> bool { + matches!(self.config.auto_login, AutoLogin::Enabled(_)) && self.auth_pending + } + + pub fn take_auth_result(&mut self) -> Option<Result<(), IggyError>> { + self.last_auth_result.take() + } +} + +fn encode_auth(username: &str, password: &str) -> Bytes { + let mut buf = BytesMut::new(); + buf.put_u32_le(username.len() as u32); + buf.put_slice(username.as_bytes()); + buf.put_u32_le(password.len() as u32); + buf.put_slice(password.as_bytes()); + buf.freeze() +} + +fn make_header(total_len: u32, code: u32) -> [u8; 8] { + let mut h = [0u8; 8]; + h[..4].copy_from_slice(&total_len.to_le_bytes()); + h[4..].copy_from_slice(&code.to_le_bytes()); + h +} diff --git a/core/sdk/src/protocol/mod.rs b/core/sdk/src/protocol/mod.rs new file mode 100644 index 00000000..90d7cc36 --- /dev/null +++ b/core/sdk/src/protocol/mod.rs @@ -0,0 +1 @@ +pub(crate) mod core; diff --git a/core/sdk/src/tcp/mod.rs b/core/sdk/src/tcp/mod.rs index c073ff0e..106cad71 100644 --- a/core/sdk/src/tcp/mod.rs +++ b/core/sdk/src/tcp/mod.rs @@ -22,3 +22,5 @@ pub(crate) mod tcp_connection_stream_kind; pub(crate) mod tcp_stream; pub(crate) mod tcp_tls_connection_stream; pub(crate) mod tcp_tls_verifier; + +pub(crate) mod tcp_client_sync; diff --git a/core/sdk/src/tcp/tcp_client_sync.rs b/core/sdk/src/tcp/tcp_client_sync.rs new file mode 100644 index 00000000..66c734ac --- /dev/null +++ b/core/sdk/src/tcp/tcp_client_sync.rs @@ -0,0 +1,181 @@ +use std::{ + fmt::Debug, io::{self, IoSlice, Read, Write}, net::SocketAddr, sync::{Arc, Mutex} +}; + +use bytes::BytesMut; +use iggy_binary_protocol::Client; +use iggy_common::{ClientState, IggyError, TcpClientConfig}; +use tokio::runtime::Runtime; +use tracing::{debug, error, trace}; + +use crate::{ + connection::transport::{ClientConfig, Transport}, + protocol::core::ProtocolCore, +}; + +pub struct TcpClientSync<T> +where + T: Transport, + T::Config: ClientConfig, +{ + pub(crate) stream_factory: Arc<T>, + pub(crate) config: Arc<T::Config>, + inner: Mutex<ProtocolCore>, + rt: Runtime, + stream: Option<T::Stream>, +} + +impl<T: Transport> TcpClientSync<T> { + fn new(stream_factory: Arc<T>, config: Arc<TcpClientConfig>) -> Self { + Self { + stream_factory, + config, + } + } +} + +#[async_trait] +impl<T> Client for TcpClientSync<T> +where + T: Transport + Debug, +{ + // async fn connect(&self) -> Result<(), IggyError> { + // let address = self.config.server_address(); + // let core = self.inner.lock().unwrap(); + // let cfg = self.config.clone(); + // let mut stream = self.stream.take(); + + // self.rt.block_on(async move { + // let current_state = core.state; + // if matches!( + // current_state, + // ClientState::Connected + // | ClientState::Authenticating + // | ClientState::Authenticated + // ) { + // debug!( + // "ConnectionDriver: Already connected (state: {:?}), completing waiter immediately", + // current_state + // ); + // return + // } + // if matches!(current_state, ClientState::Connecting) { + // debug!("ConnectionDriver: Already connecting, adding to waiters"); + // return + // } + + // core.desire_connect(address).map_err(|e| { + // error!("ConnectionDriver: desire_connect failed: {}", e.as_string()); + // io::Error::new(io::ErrorKind::ConnectionAborted, e.as_string()) + // })?; + + // stream = Some(T::connect(cfg, address)?); + + // core.on_connected().map_err(|e| { + // io::Error::new(io::ErrorKind::ConnectionRefused, e.as_string()) + // })?; + // debug!("ConnectionDriver: Connection established"); + // if !core.should_wait_auth() { + // return + // } + + // }); + + // Ok(()) + // } + + // async fn disconnect(&self) -> Result<(), IggyError> { + // let address = self.config.server_address(); + // let core = self.inner.lock().unwrap(); + // let cfg = self.config.clone(); + // let mut stream = self.stream.take(); + + // self.rt.block_on(async move { + + // }); + // } +} + +impl<T> TcpClientSync<T> +where + T: Transport + Debug, +{ + fn connect(core: &mut ProtocolCore, address: SocketAddr, config: Arc<T::Config>) -> Result<(), IggyError> { + let current_state = core.state; + if matches!( + current_state, + ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated + ) { + debug!( + "TcpClientSync: Already connected (state: {:?}), completing waiter immediately", + current_state + ); + return Ok(()); + } + if matches!(current_state, ClientState::Connecting) { + debug!("TcpClientSync: Already connecting, adding to waiters"); + return Ok(()); + } + + core.desire_connect(address).map_err(|e| { + error!("TcpClientSync: desire_connect failed: {}", e.as_string()); + e + })?; + + let mut stream = T::connect(config, address).map_err(|e| { + error!("TcpClientSync: failed to establish tcp connection: {}", e.to_string()); + IggyError::CannotEstablishConnection + })?; + + core.on_connected()?; + + debug!("TcpClientSync: Connection established"); + if !core.should_wait_auth() { + return Ok(()); + } + + match core.poll_transmit() { + None => { + error!("TcpClientSync: incorrect core state, want auth command, got none"); + return Err(IggyError::IncorrectConnectionState) + } + Some(buf) => { + let data = [IoSlice::new(&buf.header), IoSlice::new(&buf.payload)]; + stream.write_vectored(&data); + stream.flush(); + trace!("Sent a TCP request, waiting for a response..."); + + for _ in 0..16 { + let mut recv_buffer = BytesMut::with_capacity(8192); + let n = { + match stream.read(&mut recv_buffer) { + Ok(0) => { + core.disconnect(); + return Err(IggyError::CannotEstablishConnection); + } + Ok(n) => n, + Err(e) => { + error!("TcpClientSync: got error on command: {}", e); + // TODO change error + return Err(IggyError::CannotEstablishConnection); + } + } + + }; + let mut status; + core.process_incoming_with(&mut recv_buffer, |req_id, status, payload| { + if status == 0 { + return Err(IggyError::from_code(status)); + } + return Ok(()); + }); + } + + + return Ok(()) + } + } + } + + // fn drive_command() +}
