This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch draft-sans in repository https://gitbox.apache.org/repos/asf/iggy.git
commit fdccf3c01dae1bb1f79fb30f46df7cf0dc7efb17 Author: haze518 <[email protected]> AuthorDate: Fri Jul 11 08:40:11 2025 +0600 after binary --- core/integration/tests/sdk/producer/mod.rs | 100 +++++++++++++++++++++++++++++ core/sdk/src/connection/mod.rs | 2 +- core/sdk/src/connection/quic/mod.rs | 2 +- core/sdk/src/connection/tcp/tcp.rs | 12 +++- core/sdk/src/driver/tcp.rs | 15 +++++ core/sdk/src/proto/connection.rs | 37 ++++++++--- core/sdk/src/proto/runtime.rs | 12 ++++ core/sdk/src/transport_adapter/async.rs | 95 +++++++++++++++++++++------ 8 files changed, 244 insertions(+), 31 deletions(-) diff --git a/core/integration/tests/sdk/producer/mod.rs b/core/integration/tests/sdk/producer/mod.rs index e1b04539..b022634a 100644 --- a/core/integration/tests/sdk/producer/mod.rs +++ b/core/integration/tests/sdk/producer/mod.rs @@ -18,9 +18,17 @@ mod background; +use std::sync::Arc; + use bytes::Bytes; use iggy::clients::client::IggyClient; +use iggy::connection::tcp::tcp::TokioTcpFactory; +use iggy::driver::tcp::TokioTcpDriver; use iggy::prelude::*; +use iggy::proto::connection::{IggyCore, IggyCoreConfig}; +use iggy::proto::runtime::{sync, TokioRuntime}; +use iggy::transport_adapter::r#async::AsyncTransportAdapter; +use integration::test_server::TestServer; const STREAM_ID: u32 = 1; const TOPIC_ID: u32 = 1; @@ -62,3 +70,95 @@ async fn cleanup(system_client: &IggyClient) { .await .unwrap(); } + + +async fn async_send() { + let mut test_server = TestServer::default(); + test_server.start(); + + let tcp_client_config = TcpClientConfig { + server_address: test_server.get_raw_tcp_addr().unwrap(), + ..TcpClientConfig::default() + }; + + let tcp_factory = Arc::new(TokioTcpFactory::create(Arc::new(tcp_client_config))); + let core = Arc::new(sync::Mutex::new(IggyCore::new(IggyCoreConfig::default()))); + let rt: Arc<TokioRuntime> = Arc::new(TokioRuntime{}); + let notify = Arc::new(sync::Notify::new()); + let dirver = TokioTcpDriver::new(core, rt.clone(), notify.clone(), tcp_factory); + let adapter = AsyncTransportAdapter::new(tcp_factory, rt, core, dirver, notify); + + adapter.connect().await; + + + + + + + + + + + + + + + + + + + let client = Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); + let client = IggyClient::create(client, None, None); + + client.connect().await.unwrap(); + assert!(client.ping().await.is_ok(), "Failed to ping server"); + + login_root(&client).await; + init_system(&client).await; + + client.connect().await.unwrap(); + assert!(client.ping().await.is_ok(), "Failed to ping server"); + + let messages_count = 1000; + + let mut messages = Vec::new(); + for offset in 0..messages_count { + let id = (offset + 1) as u128; + let payload = create_message_payload(offset as u64); + messages.push( + IggyMessage::builder() + .id(id) + .payload(payload) + .build() + .expect("Failed to create message with headers"), + ); + } + + let producer = client + .producer(&STREAM_ID.to_string(), &TOPIC_ID.to_string()) + .unwrap() + .partitioning(Partitioning::partition_id(PARTITION_ID)) + .background(BackgroundConfig::builder().build()) + .build(); + + producer.send(messages).await.unwrap(); + sleep(Duration::from_millis(500)).await; + producer.shutdown().await; + + let consumer = Consumer::default(); + let polled_messages = client + .poll_messages( + &Identifier::numeric(STREAM_ID).unwrap(), + &Identifier::numeric(TOPIC_ID).unwrap(), + Some(PARTITION_ID), + &consumer, + &PollingStrategy::offset(0), + messages_count, + false, + ) + .await + .unwrap(); + + assert_eq!(polled_messages.messages.len() as u32, messages_count); + cleanup(&client).await; +} diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs index d52a3540..d9024e5c 100644 --- a/core/sdk/src/connection/mod.rs +++ b/core/sdk/src/connection/mod.rs @@ -8,7 +8,7 @@ pub mod quic; pub trait ConnectionFactory { fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + Send + Sync>>; - fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool>>>; + fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool> + Send + Sync>>; fn shutdown(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + Send + Sync>>; } diff --git a/core/sdk/src/connection/quic/mod.rs b/core/sdk/src/connection/quic/mod.rs index 64e7e953..1733c3b8 100644 --- a/core/sdk/src/connection/quic/mod.rs +++ b/core/sdk/src/connection/quic/mod.rs @@ -70,7 +70,7 @@ impl ConnectionFactory for QuinnFactory { }) } - fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool>>> { + fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool> + Send + Sync>> { let conn = self.connection.clone(); Box::pin(async move { let conn = conn.lock().await; diff --git a/core/sdk/src/connection/tcp/tcp.rs b/core/sdk/src/connection/tcp/tcp.rs index cf22eb16..d2e6f563 100644 --- a/core/sdk/src/connection/tcp/tcp.rs +++ b/core/sdk/src/connection/tcp/tcp.rs @@ -56,7 +56,7 @@ impl ConnectionFactory for TokioTcpFactory { } // TODO пока заглушка, нужно подумать насчет того, как это делать - fn is_alive(&self) -> std::pin::Pin<Box<dyn Future<Output = bool>>> { + fn is_alive(&self) -> std::pin::Pin<Box<dyn Future<Output = bool> + Send + Sync>> { let conn = self.stream.clone(); Box::pin(async move { let conn = conn.lock().await; @@ -82,3 +82,13 @@ impl ConnectionFactory for TokioTcpFactory { }) } } + +impl TokioTcpFactory { + pub fn create(config: Arc<TcpClientConfig>) -> Self { + Self { + config, + client_address: Arc::new(sync::Mutex::new(None)), + stream: Arc::new(sync::Mutex::new(None)), + } + } +} \ No newline at end of file diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs index c81140ee..d6809edf 100644 --- a/core/sdk/src/driver/tcp.rs +++ b/core/sdk/src/driver/tcp.rs @@ -18,6 +18,21 @@ where pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>, } +impl<R> TokioTcpDriver<R> +where + R: Runtime +{ + pub fn new(core: Arc<sync::Mutex<IggyCore>>, runtime: Arc<R>, notify: Arc<sync::Notify>, factory: Arc<TokioTcpFactory>) -> Self { + Self { + core, + rt: runtime, + notify, + factory, + pending: Arc::new(DashMap::new()), + } + } +} + impl<R> Driver for TokioTcpDriver<R> where R: Runtime diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs index de7cee74..8d5e396d 100644 --- a/core/sdk/src/proto/connection.rs +++ b/core/sdk/src/proto/connection.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, io::Cursor, pin::Pin, sync::Arc}; +use std::{collections::VecDeque, io::Cursor, pin::Pin, str::FromStr, sync::Arc}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use iggy_common::{ClientState, Command, IggyDuration, IggyError, IggyErrorDiscriminants, IggyTimestamp}; @@ -18,9 +18,15 @@ const ALREADY_EXISTS_STATUSES: &[u32] = &[ IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32, ]; -pub trait TransportConfig { - fn resstablish_after(&self) -> IggyDuration; - fn max_retries(&self) -> Option<u32>; +pub struct IggyCoreConfig { + max_retries: Option<u32>, + reestablish_after: IggyDuration, +} + +impl Default for IggyCoreConfig { + fn default() -> Self { + Self { max_retries: None, reestablish_after: IggyDuration::from_str("5s").unwrap() } + } } pub struct TxBuf { @@ -58,16 +64,27 @@ pub enum InboundResult { } pub struct IggyCore { - state: ClientState, + pub(crate) state: ClientState, last_connect: Option<IggyTimestamp>, pending: VecDeque<(u32 /* code */, Bytes /* payload */, u64 /* transport_id */)>, - config: Arc<dyn TransportConfig + Send + Sync + 'static>, // todo rewrite via generic + config: IggyCoreConfig, retry_count: u32, current_tx: Option<Arc<TxBuf>>, } impl IggyCore { - pub fn write(&mut self, cmd: &impl Command, id: u64) -> Result<(), IggyError> { + pub fn new(config: IggyCoreConfig) -> Self { + Self { + state: ClientState::Disconnected, + last_connect: None, + pending: VecDeque::new(), + config, + retry_count: 0, + current_tx: None, + } + } + + pub fn write(&mut self, code: u32, payload: Bytes, id: u64) -> Result<(), IggyError> { match self.state { ClientState::Shutdown => { trace!("Cannot send data. Client is shutdown."); @@ -83,7 +100,7 @@ impl IggyCore { } _ => {} } - self.pending.push_back((cmd.code(), cmd.to_bytes(), id)); + self.pending.push_back((code, payload, id)); Ok(()) } @@ -118,7 +135,7 @@ impl IggyCore { self.state = ClientState::Connecting; - if let Some(max_retries) = self.config.max_retries() { + if let Some(max_retries) = self.config.max_retries { if self.retry_count >= max_retries { self.state = ClientState::Disconnected; return Err(IggyError::CannotEstablishConnection); @@ -128,7 +145,7 @@ impl IggyCore { if let Some(last_connect) = self.last_connect { let now = IggyTimestamp::now(); let elapsed = now.as_micros() - last_connect.as_micros(); - let interval = self.config.resstablish_after().as_micros(); + let interval = self.config.reestablish_after.as_micros(); if elapsed < interval { let remaining = IggyDuration::from(interval - elapsed); return Ok(Order::Wait(remaining)); diff --git a/core/sdk/src/proto/runtime.rs b/core/sdk/src/proto/runtime.rs index b3227bf2..022a5adf 100644 --- a/core/sdk/src/proto/runtime.rs +++ b/core/sdk/src/proto/runtime.rs @@ -25,3 +25,15 @@ pub fn oneshot<T>() -> (sync::OneShotSender<T>, sync::OneShotReceiver<T>) { pub fn notify() -> sync::Notify { tokio::sync::Notify::new() } + +pub struct TokioRuntime; + +impl Runtime for TokioRuntime { + fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) { + tokio::spawn(future); + } + + fn sleep(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> { + Box::pin(tokio::time::sleep(duration)) + } +} diff --git a/core/sdk/src/transport_adapter/async.rs b/core/sdk/src/transport_adapter/async.rs index 33b0a27d..2ca08c89 100644 --- a/core/sdk/src/transport_adapter/async.rs +++ b/core/sdk/src/transport_adapter/async.rs @@ -1,11 +1,12 @@ use std::{ - pin::Pin, - sync::{Arc, Mutex, atomic::AtomicU64}, + pin::Pin, str::FromStr, sync::{atomic::AtomicU64, Arc, Mutex} }; use async_broadcast::{Receiver, Sender, broadcast}; +use async_trait::async_trait; use bytes::Bytes; -use iggy_common::{Command, DiagnosticEvent, IggyError}; +use iggy_binary_protocol::BinaryTransport; +use iggy_common::{ClientState, Command, DiagnosticEvent, IggyDuration, IggyError}; use tokio::sync::Notify; use tracing::{error, trace}; @@ -22,7 +23,7 @@ use crate::{ pub struct AsyncTransportAdapter<F: ConnectionFactory, R: Runtime, D: Driver> { factory: Arc<F>, rt: Arc<R>, - core: sync::Mutex<IggyCore>, + core: Arc<sync::Mutex<IggyCore>>, notify: Arc<Notify>, id: AtomicU64, driver: Arc<D>, @@ -35,18 +36,30 @@ where R: Runtime + Send + Sync + 'static, D: Driver + Send + Sync, { - async fn send_with_response<T: Command>(&self, command: &T) -> Result<RespFut, IggyError> { - self.ensure_connected().await?; + pub fn new(factory: Arc<F>, runtime: Arc<R>, core: Arc<sync::Mutex<IggyCore>>, driver: D, notify: Arc<Notify>) -> Self { + driver.start(); + Self { + factory: factory, + rt: runtime, + core, + notify, + id: AtomicU64::new(0), + driver: Arc::new(driver), + events: broadcast(1000), + } + } + // async fn send_with_response<T: Command>(&self, command: &T) -> Result<RespFut, IggyError> { + // self.ensure_connected().await?; - let (tx, rx) = runtime::oneshot::<Bytes>(); - let current_id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + // let (tx, rx) = runtime::oneshot::<Bytes>(); + // let current_id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - self.core.lock().await.write(command, current_id)?; - self.driver.register(current_id, tx); - self.notify.notify_waiters(); + // self.core.lock().await.write(command, current_id)?; + // self.driver.register(current_id, tx); + // self.notify.notify_waiters(); - Ok(RespFut { rx: rx }) - } + // Ok(RespFut { rx: rx }) + // } pub async fn connect(&self) -> Result<(), IggyError> { let mut order = self.core.lock().await.start_connect()?; @@ -83,11 +96,11 @@ where } } - async fn publish_event(&self, event: DiagnosticEvent) { - if let Err(error) = self.events.0.broadcast(event).await { - error!("Failed to send a QUIC diagnostic event: {error}"); - } - } + // async fn publish_event(&self, event: DiagnosticEvent) { + // if let Err(error) = self.events.0.broadcast(event).await { + // error!("Failed to send a QUIC diagnostic event: {error}"); + // } + // } async fn ensure_connected(&self) -> Result<(), IggyError> { if self.factory.is_alive().await { @@ -106,3 +119,49 @@ where // TODO add async fn login } + +#[async_trait] +impl<F, R, D> BinaryTransport for AsyncTransportAdapter<F, R, D> +where + F: ConnectionFactory + Send + Sync + 'static, + R: Runtime + Send + Sync + 'static, + D: Driver + Send + Sync, +{ + async fn send_with_response<T: Command>(&self, command: &T) -> Result<Bytes, IggyError> { + command.validate()?; + self.send_raw_with_response(command.code(), command.to_bytes()) + .await + } + + async fn send_raw_with_response(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError> { + self.ensure_connected().await?; + + let (tx, rx) = runtime::oneshot::<Bytes>(); + let current_id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + + self.core.lock().await.write(code, payload, current_id)?; + self.driver.register(current_id, tx); + self.notify.notify_waiters(); + + let resp = RespFut{rx}; + resp.await + } + + fn get_heartbeat_interval(&self) -> IggyDuration { + IggyDuration::from_str("5s").unwrap() + } + + /// Gets the state of the client. + async fn get_state(&self) -> ClientState { + self.core.lock().await.state + } + + async fn set_state(&self, _state: ClientState) { + } + + async fn publish_event(&self, event: DiagnosticEvent) { + if let Err(error) = self.events.0.broadcast(event).await { + error!("Failed to send a QUIC diagnostic event: {error}"); + } + } +}
