This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch draft-sans in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 35573cf13a42a506ffc3d80ce101ffeaddfcad51 Author: haze518 <[email protected]> AuthorDate: Sun Jul 20 06:46:34 2025 +0600 test --- core/sdk/src/connection/tcp/mod.rs | 1 + core/sdk/src/driver/mod.rs | 1 + core/sdk/src/driver/new_tcp.rs | 62 +++++++++++++++++++++++ core/sdk/src/proto/connection.rs | 88 ++++++++++++++++++++++++++++++++- core/sdk/src/quic/quick_client.rs | 2 +- core/sdk/src/tcp/tcp_client.rs | 1 + core/sdk/src/transport_adapter/async.rs | 18 ------- 7 files changed, 152 insertions(+), 21 deletions(-) diff --git a/core/sdk/src/connection/tcp/mod.rs b/core/sdk/src/connection/tcp/mod.rs index 5f733990..81266d48 100644 --- a/core/sdk/src/connection/tcp/mod.rs +++ b/core/sdk/src/connection/tcp/mod.rs @@ -80,6 +80,7 @@ impl StreamPair for TokioTcpStream { impl TokioTcpStream { fn new(stream: TcpStream) -> Self { let (reader, writer) = stream.into_split(); + writer.try_write(buf) Self { reader: sync::Mutex::new(BufReader::new(reader)), writer: sync::Mutex::new(BufWriter::new(writer)), diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs index d77ae183..68eaf597 100644 --- a/core/sdk/src/driver/mod.rs +++ b/core/sdk/src/driver/mod.rs @@ -1,4 +1,5 @@ pub mod tcp; +pub mod new_tcp; use std::{io::Cursor, sync::Arc}; diff --git a/core/sdk/src/driver/new_tcp.rs b/core/sdk/src/driver/new_tcp.rs new file mode 100644 index 00000000..2b14e6c5 --- /dev/null +++ b/core/sdk/src/driver/new_tcp.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; + +use bytes::Bytes; +use dashmap::DashMap; +use iggy_common::{ClientState, IggyError}; + +use crate::{connection::tcp::tcp::TokioTcpFactory, driver::Driver, proto::{connection::IggyCore, runtime::{sync::{self, OneShotReceiver}, Runtime}}}; + +pub enum Action { + Command { code: u32, payload: Bytes, id: u64, rsp: sync::OneShotSender<Bytes> }, + Connect { rsp: OneShotReceiver<Result<(), IggyError>> }, + QueryState { rsp: OneShotReceiver<ClientState> }, + Shutdown, +} + +pub struct NewTokioTcpDriver<R: Runtime> { + core: Arc<IggyCore>, + rt: Arc<R>, + factory: Arc<TokioTcpFactory>, + pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>, + rx_driver: flume::Receiver<Action>, +} + +impl<R: Runtime> Driver for NewTokioTcpDriver<R> { + fn start(&self) { + let rt = self.rt.clone(); + let core = self.core.clone(); // todo убрать clone + let factory = self.factory.clone(); + let pending = self.pending.clone(); + let rx_driver = self.rx_driver.clone(); + + rt.spawn(Box::pin(async move { + loop { + match rx_driver.recv_async().await { + Ok(act) => { + match act { + Action::Command{code,payload,id,rsp} => { + + } + Action::Connect { rsp } => { + + } + } + } + Err(e) => { + + } + } + } + })); + } + + fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>) { + self.pending.insert(id, tx); + } +} + +impl<R: Runtime> NewTokioTcpDriver<R> { + async fn ensure_connected(&self) { + + } +} diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs index 62e86264..f5c28fe8 100644 --- a/core/sdk/src/proto/connection.rs +++ b/core/sdk/src/proto/connection.rs @@ -1,10 +1,15 @@ -use std::{collections::VecDeque, io::Cursor, pin::Pin, str::FromStr, sync::Arc}; +use std::{collections::VecDeque, io::{self, Cursor}, net::SocketAddr, pin::Pin, str::FromStr, sync::Arc, task::{Context, Waker}}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use iggy_common::{ClientState, Command, IggyDuration, IggyError, IggyErrorDiscriminants, IggyTimestamp}; +use futures::AsyncWrite; +use iggy_common::{ClientState, Command, DiagnosticEvent, IggyDuration, IggyError, IggyErrorDiscriminants, IggyTimestamp}; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::io::poll_write_buf; use std::io::IoSlice; use tracing::{error, trace}; +use crate::proto::runtime::Runtime; + const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; const ALREADY_EXISTS_STATUSES: &[u32] = &[ @@ -18,6 +23,83 @@ const ALREADY_EXISTS_STATUSES: &[u32] = &[ IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32, ]; +pub enum StateKind { + Handshake, +} + +pub struct Connection { + server_address: SocketAddr, + state: StateKind, + config: IggyCoreConfig, +} + +impl Connection { + pub fn new(config: IggyCoreConfig, server_address: SocketAddr) -> Self { + Self { server_address, state: StateKind::Handshake, config } + } + + pub fn poll_transmit(&mut self, buf: &mut Vec<u8>) -> Result<(), IggyError> { + match self.state { + ClientState::Shutdown => { + trace!("Cannot send data. Client is shutdown."); + return Err(IggyError::ClientShutdown); + } + ClientState::Disconnected => { + trace!("Cannot send data. Client is not connected."); + return Err(IggyError::NotConnected); + } + ClientState::Connecting => { + trace!("Cannot send data. Client is still connecting."); + return Err(IggyError::NotConnected); + } + _ => {} + } + + let (code, payload, id) = self.pending.pop_front()?; + let len = (payload.len() + REQUEST_INITIAL_BYTES_LENGTH) as u32; + + self.current_tx = Some(Arc::new(TxBuf{ + hdr_len: len.to_le_bytes(), + hdr_code: code.to_le_bytes(), + payload, + id, + })); + + } +} + +// TODO убрать из протокола +pub struct Connecting { + conn: Connection, + events: mpsc::UnboundedSender<DiagnosticEvent>, +} + +pub struct ConnectionRef { + +} + + +pub struct State { + pub(crate) inner: Connection, + driver: Option<Waker>, + on_connected: Option<oneshot::Sender<bool>>, + connected: bool, + events: mpsc::UnboundedSender<DiagnosticEvent>, + pub(crate) blocked_writers: VecDeque<Waker>, + pub(crate) blocked_readers: VecDeque<Waker>, + pub(crate) error: Option<IggyError>, + runtime: Arc<dyn Runtime>, + send_buffer: Vec<u8>, + socket: Box<dyn AsyncWrite> +} + +impl State { + fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> { + // todo парсим self.send_buffer через connection + self.socket::poll_write_buf(io, cx, buf) + } +} + #[derive(Debug)] pub struct IggyCoreConfig { max_retries: Option<u32>, @@ -30,6 +112,8 @@ impl Default for IggyCoreConfig { } } +////////////////////////////////////////////////////////////////////////////////////////// + #[derive(Debug)] pub struct TxBuf { pub id: u64, diff --git a/core/sdk/src/quic/quick_client.rs b/core/sdk/src/quic/quick_client.rs index e3ce4419..a58595db 100644 --- a/core/sdk/src/quic/quick_client.rs +++ b/core/sdk/src/quic/quick_client.rs @@ -20,7 +20,7 @@ use crate::prelude::AutoLogin; use iggy_binary_protocol::{ BinaryClient, BinaryTransport, Client, PersonalAccessTokenClient, UserClient, }; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncWrite, AsyncWriteExt}; use crate::prelude::{IggyDuration, IggyError, IggyTimestamp, QuicClientConfig}; use crate::quic::skip_server_verification::SkipServerVerification; diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index 8db50940..7a3e101b 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -200,6 +200,7 @@ impl TcpClient { }) } + async fn handle_response( &self, status: u32, diff --git a/core/sdk/src/transport_adapter/async.rs b/core/sdk/src/transport_adapter/async.rs index d5602944..7dda9027 100644 --- a/core/sdk/src/transport_adapter/async.rs +++ b/core/sdk/src/transport_adapter/async.rs @@ -49,18 +49,6 @@ where events: broadcast(1000), } } - // async fn send_with_response<T: Command>(&self, command: &T) -> Result<RespFut, IggyError> { - // self.ensure_connected().await?; - - // let (tx, rx) = runtime::oneshot::<Bytes>(); - // let current_id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - - // self.core.lock().await.write(command, current_id)?; - // self.driver.register(current_id, tx); - // self.notify.notify_waiters(); - - // Ok(RespFut { rx: rx }) - // } pub async fn connect(&self) -> Result<(), IggyError> { let mut order = self.core.lock().await.start_connect()?; @@ -99,12 +87,6 @@ where } } - // async fn publish_event(&self, event: DiagnosticEvent) { - // if let Err(error) = self.events.0.broadcast(event).await { - // error!("Failed to send a QUIC diagnostic event: {error}"); - // } - // } - async fn ensure_connected(&self) -> Result<(), IggyError> { if self.factory.is_alive().await { return Ok(())
