This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch add-sync-client
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 2ed7f93e0f7e0040f7904fd307028fecdc801c76
Author: haze518 <[email protected]>
AuthorDate: Sun Sep 14 14:10:24 2025 +0600

    del
---
 core/common/src/error/iggy_error.rs  |   6 +
 core/sdk/src/connection/mod.rs       |   1 +
 core/sdk/src/connection/transport.rs |  62 ++++++++
 core/sdk/src/lib.rs                  |   2 +
 core/sdk/src/protocol/core.rs        | 280 +++++++++++++++++++++++++++++++++++
 core/sdk/src/protocol/mod.rs         |   1 +
 core/sdk/src/tcp/mod.rs              |   2 +
 core/sdk/src/tcp/tcp_client_sync.rs  | 181 ++++++++++++++++++++++
 8 files changed, 535 insertions(+)

diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index cc8a284b..2597e154 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -459,6 +459,12 @@ pub enum IggyError {
     CannotReadIndexPosition = 10011,
     #[error("Cannot read index timestamp")]
     CannotReadIndexTimestamp = 10012,
+    #[error("Max number of retry has exceeded")]
+    MaxRetriesExceeded = 10050,
+    #[error("Connection missed socket")]
+    ConnectionMissedSocket = 10051,
+    #[error("Incorrect connection state")]
+    IncorrectConnectionState = 10052,
 }
 
 impl IggyError {
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
new file mode 100644
index 00000000..3feff26d
--- /dev/null
+++ b/core/sdk/src/connection/mod.rs
@@ -0,0 +1 @@
+pub(crate) mod transport;
diff --git a/core/sdk/src/connection/transport.rs 
b/core/sdk/src/connection/transport.rs
new file mode 100644
index 00000000..cd4a8071
--- /dev/null
+++ b/core/sdk/src/connection/transport.rs
@@ -0,0 +1,62 @@
+use std::io::{self, Read, Write};
+use std::net::{SocketAddr, TcpStream};
+use std::str::FromStr;
+use std::sync::Arc;
+
+use iggy_common::{AutoLogin, IggyDuration, TcpClientConfig};
+
+pub trait ClientConfig {
+    fn server_address(&self) -> SocketAddr;
+    fn auto_login(&self) -> AutoLogin;
+    fn reconnection_reestablish_after(&self) -> IggyDuration;
+    fn reconnection_max_retries(&self) -> Option<u32>;
+    fn heartbeat_interval(&self) -> IggyDuration;
+}
+
+impl ClientConfig for TcpClientConfig {
+    fn auto_login(&self) -> AutoLogin {
+        self.auto_login.clone()
+    }
+
+    fn heartbeat_interval(&self) -> IggyDuration {
+        self.heartbeat_interval
+    }
+
+    fn reconnection_max_retries(&self) -> Option<u32> {
+        self.reconnection.max_retries
+    }
+
+    fn reconnection_reestablish_after(&self) -> IggyDuration {
+        self.reconnection.reestablish_after
+    }
+
+    fn server_address(&self) -> SocketAddr {
+        SocketAddr::from_str(&self.server_address).unwrap()
+    }
+}
+
+pub trait Transport {
+    type Stream: Read + Write;
+    type Config: ClientConfig;
+
+    fn connect(
+        cfg: Arc<Self::Config>, // TODO maybe should remove arc
+        server_address: SocketAddr,
+    ) -> io::Result<Self::Stream>;
+}
+
+pub struct TcpTransport;
+
+impl Transport for TcpTransport {
+    type Stream = TcpStream;
+    type Config = TcpClientConfig;
+
+    fn connect(
+            cfg: Arc<Self::Config>, // TODO maybe should remove arc
+            server_address: SocketAddr,
+        ) -> io::Result<Self::Stream> {
+            let stream = TcpStream::connect(server_address)?;
+            stream.set_nodelay(cfg.nodelay)?;
+            Ok(stream)
+        }
+}
diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs
index c315fd5e..52e7e118 100644
--- a/core/sdk/src/lib.rs
+++ b/core/sdk/src/lib.rs
@@ -26,3 +26,5 @@ pub mod prelude;
 pub mod quic;
 pub mod stream_builder;
 pub mod tcp;
+pub mod protocol;
+pub mod connection;
diff --git a/core/sdk/src/protocol/core.rs b/core/sdk/src/protocol/core.rs
new file mode 100644
index 00000000..95a8f697
--- /dev/null
+++ b/core/sdk/src/protocol/core.rs
@@ -0,0 +1,280 @@
+use std::{
+    collections::VecDeque,
+    net::SocketAddr,
+};
+
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+use iggy_common::{AutoLogin, ClientState, Credentials, IggyDuration, 
IggyError, IggyTimestamp};
+use tracing::{debug, info, warn};
+
+const REQUEST_INITIAL_BYTES_LENGTH: usize = 4;
+const REQUEST_HEADER_BYTES: usize = 8;
+const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8;
+#[derive(Debug)]
+pub struct ProtocolCoreConfig {
+    pub auto_login: AutoLogin,
+    pub reestablish_after: IggyDuration,
+    pub max_retries: Option<u32>,
+}
+
+#[derive(Debug)]
+pub enum ControlAction {
+    Connect(SocketAddr),
+    Wait(IggyDuration),
+    Authenticate { username: String, password: String },
+    Noop,
+    Error(IggyError),
+}
+
+pub struct TxBuf {
+    pub header: [u8; 8],
+    pub payload: Bytes,
+    pub request_id: u64,
+}
+
+impl TxBuf {
+    #[inline]
+    pub fn total_len(&self) -> usize {
+        REQUEST_HEADER_BYTES + self.payload.len()
+    }
+}
+
+#[derive(Debug)]
+pub struct ProtocolCore {
+    pub state: ClientState,
+    config: ProtocolCoreConfig,
+    last_connect_attempt: Option<IggyTimestamp>,
+    pub retry_count: u32,
+    next_request_id: u64,
+    pending_sends: VecDeque<(u32, Bytes, u64)>,
+    sent_order: VecDeque<u64>,
+    auth_pending: bool,
+    auth_request_id: Option<u64>,
+    server_address: Option<SocketAddr>,
+    last_auth_result: Option<Result<(), IggyError>>,
+}
+
+impl ProtocolCore {
+    pub fn new(config: ProtocolCoreConfig) -> Self {
+        Self {
+            state: ClientState::Disconnected,
+            config,
+            last_connect_attempt: None,
+            retry_count: 0,
+            next_request_id: 1,
+            pending_sends: VecDeque::new(),
+            sent_order: VecDeque::new(),
+            auth_pending: false,
+            auth_request_id: None,
+            server_address: None,
+            last_auth_result: None,
+        }
+    }
+
+    pub fn poll_transmit(&mut self) -> Option<TxBuf> {
+        if let Some((code, payload, request_id)) = 
self.pending_sends.pop_front() {
+            let total_len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as 
u32;
+            self.sent_order.push_back(request_id);
+
+            Some(TxBuf {
+                payload,
+                header: make_header(total_len, code),
+                request_id,
+            })
+        } else {
+            None
+        }
+    }
+
+    pub fn send(&mut self, code: u32, payload: Bytes) -> Result<u64, 
IggyError> {
+        match self.state {
+            ClientState::Shutdown => Err(IggyError::ClientShutdown),
+            ClientState::Disconnected | ClientState::Connecting => 
Err(IggyError::NotConnected),
+            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated => {
+                Ok(self.queue_send(code, payload))
+            }
+        }
+    }
+
+    fn queue_send(&mut self, code: u32, payload: Bytes) -> u64 {
+        let request_id = self.next_request_id;
+        self.next_request_id += 1;
+        self.pending_sends.push_back((code, payload, request_id));
+        request_id
+    }
+
+    pub fn process_incoming_with<F: FnMut(u64, u32, Bytes)>(
+        &mut self,
+        buf: &mut BytesMut,
+        mut f: F,
+    ) {
+        loop {
+            if buf.len() < RESPONSE_INITIAL_BYTES_LENGTH {
+                break;
+            }
+            let status = u32::from_le_bytes(buf[..4].try_into().unwrap());
+            let length = u32::from_le_bytes(buf[4..8].try_into().unwrap());
+            let total = RESPONSE_INITIAL_BYTES_LENGTH + length as usize;
+            if buf.len() < total {
+                break;
+            }
+
+            buf.advance(RESPONSE_INITIAL_BYTES_LENGTH);
+            let payload = if length <= 1 {
+                Bytes::new()
+            } else {
+                buf.split_to(length as usize).freeze()
+            };
+            if let Some(id) = self.on_response(status) {
+                f(id, status, payload);
+            }
+        }
+    }
+
+    pub fn on_response(&mut self, status: u32) -> Option<u64> {
+        let request_id = self.sent_order.pop_front()?;
+
+        if Some(request_id) == self.auth_request_id {
+            if status == 0 {
+                debug!("Authentication successful");
+                self.state = ClientState::Authenticated;
+                self.auth_pending = false;
+                self.last_auth_result = Some(Ok(()));
+            } else {
+                warn!("Authentication failed with status: {}", status);
+                self.state = ClientState::Connected;
+                self.auth_pending = false;
+                self.last_auth_result = Some(Err(IggyError::Unauthenticated));
+            }
+            self.auth_request_id = None;
+        }
+
+        Some(request_id)
+    }
+
+    pub fn poll(&mut self) -> ControlAction {
+        match self.state {
+            ClientState::Shutdown => 
ControlAction::Error(IggyError::ClientShutdown),
+            ClientState::Disconnected => ControlAction::Noop,
+            ClientState::Authenticated | ClientState::Authenticating | 
ClientState::Connected => {
+                ControlAction::Noop
+            }
+            ClientState::Connecting => {
+                let server_address = match self.server_address {
+                    Some(addr) => addr,
+                    None => return 
ControlAction::Error(IggyError::ConnectionMissedSocket),
+                };
+
+                if let Some(last) = self.last_connect_attempt {
+                    let now = IggyTimestamp::now();
+                    let elapsed = 
now.as_micros().saturating_sub(last.as_micros());
+                    if elapsed < self.config.reestablish_after.as_micros() {
+                        let remaining =
+                            
IggyDuration::from(self.config.reestablish_after.as_micros() - elapsed);
+                        return ControlAction::Wait(remaining);
+                    }
+                }
+
+                if let Some(max_retries) = self.config.max_retries {
+                    if self.retry_count >= max_retries {
+                        return 
ControlAction::Error(IggyError::MaxRetriesExceeded);
+                    }
+                }
+
+                self.retry_count += 1;
+                self.last_connect_attempt = Some(IggyTimestamp::now());
+
+                return ControlAction::Connect(server_address);
+            }
+        }
+    }
+
+    pub fn desire_connect(&mut self, server_address: SocketAddr) -> Result<(), 
IggyError> {
+        match self.state {
+            ClientState::Shutdown => return Err(IggyError::ClientShutdown),
+            ClientState::Connecting => return Ok(()),
+            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated => {
+                return Ok(());
+            }
+            _ => {
+                self.state = ClientState::Connecting;
+                self.server_address = Some(server_address);
+            }
+        }
+
+        Ok(())
+    }
+
+    pub fn on_connected(&mut self) -> Result<(), IggyError> {
+        debug!("Transport connected");
+        if self.state != ClientState::Connecting {
+            return Err(IggyError::IncorrectConnectionState);
+        }
+        self.state = ClientState::Connected;
+        self.retry_count = 0;
+
+        match &self.config.auto_login {
+            AutoLogin::Disabled => {
+                info!("Automatic sign-in is disabled.");
+            }
+            AutoLogin::Enabled(credentials) => {
+                if !self.auth_pending {
+                    self.state = ClientState::Authenticating;
+                    self.auth_pending = true;
+
+                    match credentials {
+                        Credentials::UsernamePassword(username, password) => {
+                            let auth_payload = encode_auth(&username, 
&password);
+                            let auth_id = self.queue_send(0x0A, auth_payload);
+                            self.auth_request_id = Some(auth_id);
+                        }
+                        _ => {
+                            todo!("add PersonalAccessToken")
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    pub fn disconnect(&mut self) {
+        debug!("Transport disconnected");
+        self.state = ClientState::Disconnected;
+        self.auth_pending = false;
+        self.auth_request_id = None;
+        self.sent_order.clear();
+    }
+
+    pub fn shutdown(&mut self) {
+        self.state = ClientState::Shutdown;
+        self.auth_pending = false;
+        self.auth_request_id = None;
+        self.sent_order.clear();
+    }
+
+    pub fn should_wait_auth(&self) -> bool {
+        matches!(self.config.auto_login, AutoLogin::Enabled(_)) && 
self.auth_pending
+    }
+
+    pub fn take_auth_result(&mut self) -> Option<Result<(), IggyError>> {
+        self.last_auth_result.take()
+    }
+}
+
+fn encode_auth(username: &str, password: &str) -> Bytes {
+    let mut buf = BytesMut::new();
+    buf.put_u32_le(username.len() as u32);
+    buf.put_slice(username.as_bytes());
+    buf.put_u32_le(password.len() as u32);
+    buf.put_slice(password.as_bytes());
+    buf.freeze()
+}
+
+fn make_header(total_len: u32, code: u32) -> [u8; 8] {
+    let mut h = [0u8; 8];
+    h[..4].copy_from_slice(&total_len.to_le_bytes());
+    h[4..].copy_from_slice(&code.to_le_bytes());
+    h
+}
diff --git a/core/sdk/src/protocol/mod.rs b/core/sdk/src/protocol/mod.rs
new file mode 100644
index 00000000..90d7cc36
--- /dev/null
+++ b/core/sdk/src/protocol/mod.rs
@@ -0,0 +1 @@
+pub(crate) mod core;
diff --git a/core/sdk/src/tcp/mod.rs b/core/sdk/src/tcp/mod.rs
index c073ff0e..106cad71 100644
--- a/core/sdk/src/tcp/mod.rs
+++ b/core/sdk/src/tcp/mod.rs
@@ -22,3 +22,5 @@ pub(crate) mod tcp_connection_stream_kind;
 pub(crate) mod tcp_stream;
 pub(crate) mod tcp_tls_connection_stream;
 pub(crate) mod tcp_tls_verifier;
+
+pub(crate) mod tcp_client_sync;
diff --git a/core/sdk/src/tcp/tcp_client_sync.rs 
b/core/sdk/src/tcp/tcp_client_sync.rs
new file mode 100644
index 00000000..66c734ac
--- /dev/null
+++ b/core/sdk/src/tcp/tcp_client_sync.rs
@@ -0,0 +1,181 @@
+use std::{
+    fmt::Debug, io::{self, IoSlice, Read, Write}, net::SocketAddr, sync::{Arc, 
Mutex}
+};
+
+use bytes::BytesMut;
+use iggy_binary_protocol::Client;
+use iggy_common::{ClientState, IggyError, TcpClientConfig};
+use tokio::runtime::Runtime;
+use tracing::{debug, error, trace};
+
+use crate::{
+    connection::transport::{ClientConfig, Transport},
+    protocol::core::ProtocolCore,
+};
+
+pub struct TcpClientSync<T>
+where
+    T: Transport,
+    T::Config: ClientConfig,
+{
+    pub(crate) stream_factory: Arc<T>,
+    pub(crate) config: Arc<T::Config>,
+    inner: Mutex<ProtocolCore>,
+    rt: Runtime,
+    stream: Option<T::Stream>,
+}
+
+impl<T: Transport> TcpClientSync<T> {
+    fn new(stream_factory: Arc<T>, config: Arc<TcpClientConfig>) -> Self {
+        Self {
+            stream_factory,
+            config,
+        }
+    }
+}
+
+#[async_trait]
+impl<T> Client for TcpClientSync<T>
+where
+    T: Transport + Debug,
+{
+    // async fn connect(&self) -> Result<(), IggyError> {
+    //     let address = self.config.server_address();
+    //     let core = self.inner.lock().unwrap();
+    //     let cfg = self.config.clone();
+    //     let mut stream = self.stream.take();
+
+    //     self.rt.block_on(async move {
+    //         let current_state = core.state;
+    //             if matches!(
+    //                 current_state,
+    //                 ClientState::Connected
+    //                     | ClientState::Authenticating
+    //                     | ClientState::Authenticated
+    //             ) {
+    //                 debug!(
+    //                     "ConnectionDriver: Already connected (state: {:?}), 
completing waiter immediately",
+    //                     current_state
+    //                 );
+    //                 return
+    //             }
+    //         if matches!(current_state, ClientState::Connecting) {
+    //             debug!("ConnectionDriver: Already connecting, adding to 
waiters");
+    //             return
+    //         }
+
+    //         core.desire_connect(address).map_err(|e| {
+    //             error!("ConnectionDriver: desire_connect failed: {}", 
e.as_string());
+    //             io::Error::new(io::ErrorKind::ConnectionAborted, 
e.as_string())
+    //         })?;
+
+    //         stream = Some(T::connect(cfg, address)?);
+
+    //         core.on_connected().map_err(|e| {
+    //             io::Error::new(io::ErrorKind::ConnectionRefused, 
e.as_string())
+    //         })?;
+    //         debug!("ConnectionDriver: Connection established");
+    //         if !core.should_wait_auth() {
+    //             return
+    //         }
+
+    //     });
+
+    //     Ok(())
+    // }
+
+    // async  fn disconnect(&self) -> Result<(), IggyError> {
+    //     let address = self.config.server_address();
+    //     let core = self.inner.lock().unwrap();
+    //     let cfg = self.config.clone();
+    //     let mut stream = self.stream.take();
+
+    //     self.rt.block_on(async move {
+
+    //     });
+    // }
+}
+
+impl<T> TcpClientSync<T>
+where
+    T: Transport + Debug,
+{
+    fn connect(core: &mut ProtocolCore, address: SocketAddr, config: 
Arc<T::Config>) -> Result<(), IggyError> {
+        let current_state = core.state;
+        if matches!(
+            current_state,
+            ClientState::Connected | ClientState::Authenticating | 
ClientState::Authenticated
+        ) {
+            debug!(
+                "TcpClientSync: Already connected (state: {:?}), completing 
waiter immediately",
+                current_state
+            );
+            return Ok(());
+        }
+        if matches!(current_state, ClientState::Connecting) {
+            debug!("TcpClientSync: Already connecting, adding to waiters");
+            return Ok(());
+        }
+
+        core.desire_connect(address).map_err(|e| {
+            error!("TcpClientSync: desire_connect failed: {}", e.as_string());
+            e
+        })?;
+
+        let mut stream = T::connect(config, address).map_err(|e| {
+            error!("TcpClientSync: failed to establish tcp connection: {}", 
e.to_string());
+            IggyError::CannotEstablishConnection
+        })?;
+
+        core.on_connected()?;
+
+        debug!("TcpClientSync: Connection established");
+        if !core.should_wait_auth() {
+            return Ok(());
+        }
+
+        match core.poll_transmit() {
+            None => {
+                error!("TcpClientSync: incorrect core state, want auth 
command, got none");
+                return Err(IggyError::IncorrectConnectionState)
+            }
+            Some(buf) => {
+                let data = [IoSlice::new(&buf.header), 
IoSlice::new(&buf.payload)];
+                stream.write_vectored(&data);
+                stream.flush();
+                trace!("Sent a TCP request, waiting for a response...");
+
+                for _ in 0..16 {
+                    let mut recv_buffer = BytesMut::with_capacity(8192);
+                    let n = {
+                        match stream.read(&mut recv_buffer) {
+                            Ok(0) => {
+                                core.disconnect();
+                                return 
Err(IggyError::CannotEstablishConnection);
+                            }
+                            Ok(n) => n,
+                            Err(e) => {
+                                error!("TcpClientSync: got error on command: 
{}", e);
+                                // TODO change error
+                                return 
Err(IggyError::CannotEstablishConnection);
+                            }
+                        }
+
+                    };
+                    let mut status;
+                    core.process_incoming_with(&mut recv_buffer, |req_id, 
status, payload| {
+                        if status == 0 {
+                            return Err(IggyError::from_code(status));
+                        }
+                        return Ok(());
+                    });
+                }
+
+
+                return Ok(())
+            }
+        }
+    }
+
+    // fn drive_command()
+}

Reply via email to