This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 35573cf13a42a506ffc3d80ce101ffeaddfcad51
Author: haze518 <[email protected]>
AuthorDate: Sun Jul 20 06:46:34 2025 +0600

    test
---
 core/sdk/src/connection/tcp/mod.rs      |  1 +
 core/sdk/src/driver/mod.rs              |  1 +
 core/sdk/src/driver/new_tcp.rs          | 62 +++++++++++++++++++++++
 core/sdk/src/proto/connection.rs        | 88 ++++++++++++++++++++++++++++++++-
 core/sdk/src/quic/quick_client.rs       |  2 +-
 core/sdk/src/tcp/tcp_client.rs          |  1 +
 core/sdk/src/transport_adapter/async.rs | 18 -------
 7 files changed, 152 insertions(+), 21 deletions(-)

diff --git a/core/sdk/src/connection/tcp/mod.rs 
b/core/sdk/src/connection/tcp/mod.rs
index 5f733990..81266d48 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -80,6 +80,7 @@ impl StreamPair for TokioTcpStream {
 impl TokioTcpStream {
     fn new(stream: TcpStream) -> Self {
         let (reader, writer) = stream.into_split();
+        writer.try_write(buf)
         Self {
             reader: sync::Mutex::new(BufReader::new(reader)),
             writer: sync::Mutex::new(BufWriter::new(writer)),
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index d77ae183..68eaf597 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -1,4 +1,5 @@
 pub mod tcp;
+pub mod new_tcp;
 
 use std::{io::Cursor, sync::Arc};
 
diff --git a/core/sdk/src/driver/new_tcp.rs b/core/sdk/src/driver/new_tcp.rs
new file mode 100644
index 00000000..2b14e6c5
--- /dev/null
+++ b/core/sdk/src/driver/new_tcp.rs
@@ -0,0 +1,62 @@
+use std::sync::Arc;
+
+use bytes::Bytes;
+use dashmap::DashMap;
+use iggy_common::{ClientState, IggyError};
+
+use crate::{connection::tcp::tcp::TokioTcpFactory, driver::Driver, 
proto::{connection::IggyCore, runtime::{sync::{self, OneShotReceiver}, 
Runtime}}};
+
+pub enum Action {
+    Command { code: u32, payload: Bytes, id: u64, rsp: 
sync::OneShotSender<Bytes> },
+    Connect { rsp: OneShotReceiver<Result<(), IggyError>> },
+    QueryState { rsp: OneShotReceiver<ClientState> },
+    Shutdown,
+}
+
+pub struct NewTokioTcpDriver<R: Runtime> {
+    core: Arc<IggyCore>,
+    rt: Arc<R>,
+    factory: Arc<TokioTcpFactory>,
+    pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>,
+    rx_driver: flume::Receiver<Action>,
+}
+
+impl<R: Runtime> Driver for NewTokioTcpDriver<R> {
+    fn start(&self) {
+        let rt = self.rt.clone();
+        let core = self.core.clone(); // todo убрать clone
+        let factory = self.factory.clone();
+        let pending = self.pending.clone();
+        let rx_driver = self.rx_driver.clone();
+
+        rt.spawn(Box::pin(async move {
+            loop {
+                match rx_driver.recv_async().await {
+                    Ok(act) => {
+                        match act {
+                            Action::Command{code,payload,id,rsp} => {
+                                
+                            }
+                            Action::Connect { rsp } => {
+                                
+                            }
+                        }
+                    }
+                    Err(e) => {
+
+                    }
+                }
+            }
+        }));
+    }
+
+    fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>) {
+        self.pending.insert(id, tx);
+    }
+}
+
+impl<R: Runtime> NewTokioTcpDriver<R> {
+    async fn ensure_connected(&self) {
+
+    }
+}
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 62e86264..f5c28fe8 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -1,10 +1,15 @@
-use std::{collections::VecDeque, io::Cursor, pin::Pin, str::FromStr, 
sync::Arc};
+use std::{collections::VecDeque, io::{self, Cursor}, net::SocketAddr, 
pin::Pin, str::FromStr, sync::Arc, task::{Context, Waker}};
 
 use bytes::{Buf, BufMut, Bytes, BytesMut};
-use iggy_common::{ClientState, Command, IggyDuration, IggyError, 
IggyErrorDiscriminants, IggyTimestamp};
+use futures::AsyncWrite;
+use iggy_common::{ClientState, Command, DiagnosticEvent, IggyDuration, 
IggyError, IggyErrorDiscriminants, IggyTimestamp};
+use tokio::sync::{mpsc, oneshot};
+use tokio_util::io::poll_write_buf;
 use std::io::IoSlice;
 use tracing::{error, trace};
 
+use crate::proto::runtime::Runtime;
+
 const REQUEST_INITIAL_BYTES_LENGTH: usize = 4;
 const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8;
 const ALREADY_EXISTS_STATUSES: &[u32] = &[
@@ -18,6 +23,83 @@ const ALREADY_EXISTS_STATUSES: &[u32] = &[
     IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32,
 ];
 
+pub enum StateKind {
+    Handshake,
+}
+
+pub struct Connection {
+    server_address: SocketAddr,
+    state: StateKind,
+    config: IggyCoreConfig,
+}
+
+impl Connection {
+    pub fn new(config: IggyCoreConfig, server_address: SocketAddr) -> Self {
+        Self { server_address, state: StateKind::Handshake, config }
+    }
+
+    pub fn poll_transmit(&mut self, buf: &mut Vec<u8>) -> Result<(), 
IggyError> {
+        match self.state {
+            ClientState::Shutdown => {
+                trace!("Cannot send data. Client is shutdown.");
+                return Err(IggyError::ClientShutdown);
+            }
+            ClientState::Disconnected => {
+                trace!("Cannot send data. Client is not connected.");
+                return Err(IggyError::NotConnected);
+            }
+            ClientState::Connecting => {
+                trace!("Cannot send data. Client is still connecting.");
+                return Err(IggyError::NotConnected);
+            }
+            _ => {}
+        }
+
+        let (code, payload, id) = self.pending.pop_front()?;
+        let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32;
+
+        self.current_tx = Some(Arc::new(TxBuf{
+            hdr_len: len.to_le_bytes(),
+            hdr_code: code.to_le_bytes(),
+            payload, 
+            id,
+        }));
+
+    }
+}
+
+// TODO убрать из протокола
+pub struct Connecting {
+    conn: Connection,
+    events: mpsc::UnboundedSender<DiagnosticEvent>,
+}
+
+pub struct ConnectionRef {
+
+}
+
+
+pub struct State {
+    pub(crate) inner: Connection,
+    driver: Option<Waker>,
+    on_connected: Option<oneshot::Sender<bool>>,
+    connected: bool,
+    events: mpsc::UnboundedSender<DiagnosticEvent>,
+    pub(crate) blocked_writers: VecDeque<Waker>,
+    pub(crate) blocked_readers: VecDeque<Waker>,
+    pub(crate) error: Option<IggyError>,
+    runtime: Arc<dyn Runtime>,
+    send_buffer: Vec<u8>,
+    socket: Box<dyn AsyncWrite>
+}
+
+impl State {
+    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
+        // todo парсим self.send_buffer через connection
+        self.socket::poll_write_buf(io, cx, buf)
+    }
+}
+
 #[derive(Debug)]
 pub struct IggyCoreConfig {
     max_retries: Option<u32>,
@@ -30,6 +112,8 @@ impl Default for IggyCoreConfig {
     }
 }
 
+//////////////////////////////////////////////////////////////////////////////////////////
+
 #[derive(Debug)]
 pub struct TxBuf {
     pub id: u64,
diff --git a/core/sdk/src/quic/quick_client.rs 
b/core/sdk/src/quic/quick_client.rs
index e3ce4419..a58595db 100644
--- a/core/sdk/src/quic/quick_client.rs
+++ b/core/sdk/src/quic/quick_client.rs
@@ -20,7 +20,7 @@ use crate::prelude::AutoLogin;
 use iggy_binary_protocol::{
     BinaryClient, BinaryTransport, Client, PersonalAccessTokenClient, 
UserClient,
 };
-use tokio::io::AsyncWriteExt;
+use tokio::io::{AsyncWrite, AsyncWriteExt};
 
 use crate::prelude::{IggyDuration, IggyError, IggyTimestamp, QuicClientConfig};
 use crate::quic::skip_server_verification::SkipServerVerification;
diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs
index 8db50940..7a3e101b 100644
--- a/core/sdk/src/tcp/tcp_client.rs
+++ b/core/sdk/src/tcp/tcp_client.rs
@@ -200,6 +200,7 @@ impl TcpClient {
         })
     }
 
+    
     async fn handle_response(
         &self,
         status: u32,
diff --git a/core/sdk/src/transport_adapter/async.rs 
b/core/sdk/src/transport_adapter/async.rs
index d5602944..7dda9027 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -49,18 +49,6 @@ where
             events: broadcast(1000),
         }
     }
-    // async fn send_with_response<T: Command>(&self, command: &T) -> 
Result<RespFut, IggyError> {
-    //         self.ensure_connected().await?;
-    
-    //         let (tx, rx) = runtime::oneshot::<Bytes>();
-    //         let current_id = self.id.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
-
-    //         self.core.lock().await.write(command, current_id)?;
-    //         self.driver.register(current_id, tx);
-    //         self.notify.notify_waiters();
-
-    //         Ok(RespFut { rx: rx })
-    // }
 
     pub async fn connect(&self) -> Result<(), IggyError> {
         let mut order = self.core.lock().await.start_connect()?;
@@ -99,12 +87,6 @@ where
         }
     }
 
-    // async fn publish_event(&self, event: DiagnosticEvent) {
-    //     if let Err(error) = self.events.0.broadcast(event).await {
-    //         error!("Failed to send a QUIC diagnostic event: {error}");
-    //     }
-    // }
-
     async fn ensure_connected(&self) -> Result<(), IggyError> {
         if self.factory.is_alive().await {
             return Ok(())

Reply via email to