This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit fdccf3c01dae1bb1f79fb30f46df7cf0dc7efb17
Author: haze518 <[email protected]>
AuthorDate: Fri Jul 11 08:40:11 2025 +0600

    after binary
---
 core/integration/tests/sdk/producer/mod.rs | 100 +++++++++++++++++++++++++++++
 core/sdk/src/connection/mod.rs             |   2 +-
 core/sdk/src/connection/quic/mod.rs        |   2 +-
 core/sdk/src/connection/tcp/tcp.rs         |  12 +++-
 core/sdk/src/driver/tcp.rs                 |  15 +++++
 core/sdk/src/proto/connection.rs           |  37 ++++++++---
 core/sdk/src/proto/runtime.rs              |  12 ++++
 core/sdk/src/transport_adapter/async.rs    |  95 +++++++++++++++++++++------
 8 files changed, 244 insertions(+), 31 deletions(-)

diff --git a/core/integration/tests/sdk/producer/mod.rs 
b/core/integration/tests/sdk/producer/mod.rs
index e1b04539..b022634a 100644
--- a/core/integration/tests/sdk/producer/mod.rs
+++ b/core/integration/tests/sdk/producer/mod.rs
@@ -18,9 +18,17 @@
 
 mod background;
 
+use std::sync::Arc;
+
 use bytes::Bytes;
 use iggy::clients::client::IggyClient;
+use iggy::connection::tcp::tcp::TokioTcpFactory;
+use iggy::driver::tcp::TokioTcpDriver;
 use iggy::prelude::*;
+use iggy::proto::connection::{IggyCore, IggyCoreConfig};
+use iggy::proto::runtime::{sync, TokioRuntime};
+use iggy::transport_adapter::r#async::AsyncTransportAdapter;
+use integration::test_server::TestServer;
 
 const STREAM_ID: u32 = 1;
 const TOPIC_ID: u32 = 1;
@@ -62,3 +70,95 @@ async fn cleanup(system_client: &IggyClient) {
         .await
         .unwrap();
 }
+
+
+async fn async_send() {
+    let mut test_server = TestServer::default();
+    test_server.start();
+
+    let tcp_client_config = TcpClientConfig {
+        server_address: test_server.get_raw_tcp_addr().unwrap(),
+        ..TcpClientConfig::default()
+    };
+    
+    let tcp_factory = 
Arc::new(TokioTcpFactory::create(Arc::new(tcp_client_config)));
+    let core = 
Arc::new(sync::Mutex::new(IggyCore::new(IggyCoreConfig::default())));
+    let rt: Arc<TokioRuntime> = Arc::new(TokioRuntime{});
+    let notify = Arc::new(sync::Notify::new());
+    let dirver = TokioTcpDriver::new(core, rt.clone(), notify.clone(), 
tcp_factory);
+    let adapter = AsyncTransportAdapter::new(tcp_factory, rt, core, dirver, 
notify);
+
+    adapter.connect().await;
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+    
+
+    let client = 
Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+    let client = IggyClient::create(client, None, None);
+
+    client.connect().await.unwrap();
+    assert!(client.ping().await.is_ok(), "Failed to ping server");
+
+    login_root(&client).await;
+    init_system(&client).await;
+
+    client.connect().await.unwrap();
+    assert!(client.ping().await.is_ok(), "Failed to ping server");
+
+    let messages_count = 1000;
+
+    let mut messages = Vec::new();
+    for offset in 0..messages_count {
+        let id = (offset + 1) as u128;
+        let payload = create_message_payload(offset as u64);
+        messages.push(
+            IggyMessage::builder()
+                .id(id)
+                .payload(payload)
+                .build()
+                .expect("Failed to create message with headers"),
+        );
+    }
+
+    let producer = client
+        .producer(&STREAM_ID.to_string(), &TOPIC_ID.to_string())
+        .unwrap()
+        .partitioning(Partitioning::partition_id(PARTITION_ID))
+        .background(BackgroundConfig::builder().build())
+        .build();
+
+    producer.send(messages).await.unwrap();
+    sleep(Duration::from_millis(500)).await;
+    producer.shutdown().await;
+
+    let consumer = Consumer::default();
+    let polled_messages = client
+        .poll_messages(
+            &Identifier::numeric(STREAM_ID).unwrap(),
+            &Identifier::numeric(TOPIC_ID).unwrap(),
+            Some(PARTITION_ID),
+            &consumer,
+            &PollingStrategy::offset(0),
+            messages_count,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(polled_messages.messages.len() as u32, messages_count);
+    cleanup(&client).await;
+}
diff --git a/core/sdk/src/connection/mod.rs b/core/sdk/src/connection/mod.rs
index d52a3540..d9024e5c 100644
--- a/core/sdk/src/connection/mod.rs
+++ b/core/sdk/src/connection/mod.rs
@@ -8,7 +8,7 @@ pub mod quic;
 
 pub trait ConnectionFactory {
     fn connect(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send + Sync>>;
-    fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool>>>;
+    fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool> + Send + Sync>>;
     fn shutdown(&self) -> Pin<Box<dyn Future<Output = Result<(), IggyError>> + 
Send + Sync>>;
 }
 
diff --git a/core/sdk/src/connection/quic/mod.rs 
b/core/sdk/src/connection/quic/mod.rs
index 64e7e953..1733c3b8 100644
--- a/core/sdk/src/connection/quic/mod.rs
+++ b/core/sdk/src/connection/quic/mod.rs
@@ -70,7 +70,7 @@ impl ConnectionFactory for QuinnFactory {
         })
     }
 
-    fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool>>> {
+    fn is_alive(&self) -> Pin<Box<dyn Future<Output = bool> + Send + Sync>> {
         let conn = self.connection.clone();
         Box::pin(async move {
             let conn = conn.lock().await;
diff --git a/core/sdk/src/connection/tcp/tcp.rs 
b/core/sdk/src/connection/tcp/tcp.rs
index cf22eb16..d2e6f563 100644
--- a/core/sdk/src/connection/tcp/tcp.rs
+++ b/core/sdk/src/connection/tcp/tcp.rs
@@ -56,7 +56,7 @@ impl ConnectionFactory for TokioTcpFactory {
     }
 
     // TODO пока заглушка, нужно подумать насчет того, как это делать
-    fn is_alive(&self) -> std::pin::Pin<Box<dyn Future<Output = bool>>> {
+    fn is_alive(&self) -> std::pin::Pin<Box<dyn Future<Output = bool> + Send + 
Sync>> {
         let conn = self.stream.clone();
         Box::pin(async move {
             let conn = conn.lock().await;
@@ -82,3 +82,13 @@ impl ConnectionFactory for TokioTcpFactory {
         })
     }
 }
+
+impl TokioTcpFactory {
+    pub fn create(config: Arc<TcpClientConfig>) -> Self {
+        Self {
+            config,
+            client_address: Arc::new(sync::Mutex::new(None)),
+            stream: Arc::new(sync::Mutex::new(None)), 
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs
index c81140ee..d6809edf 100644
--- a/core/sdk/src/driver/tcp.rs
+++ b/core/sdk/src/driver/tcp.rs
@@ -18,6 +18,21 @@ where
     pending: Arc<DashMap<u64, sync::OneShotSender<Bytes>>>,
 }
 
+impl<R> TokioTcpDriver<R>
+where
+    R: Runtime
+{
+    pub fn new(core: Arc<sync::Mutex<IggyCore>>, runtime: Arc<R>, notify: 
Arc<sync::Notify>, factory: Arc<TokioTcpFactory>) -> Self {
+        Self {
+            core,
+            rt: runtime,
+            notify,
+            factory,
+            pending: Arc::new(DashMap::new()),
+        }
+    }
+}
+
 impl<R> Driver for TokioTcpDriver<R>
 where
     R: Runtime
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index de7cee74..8d5e396d 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -1,4 +1,4 @@
-use std::{collections::VecDeque, io::Cursor, pin::Pin, sync::Arc};
+use std::{collections::VecDeque, io::Cursor, pin::Pin, str::FromStr, 
sync::Arc};
 
 use bytes::{Buf, BufMut, Bytes, BytesMut};
 use iggy_common::{ClientState, Command, IggyDuration, IggyError, 
IggyErrorDiscriminants, IggyTimestamp};
@@ -18,9 +18,15 @@ const ALREADY_EXISTS_STATUSES: &[u32] = &[
     IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32,
 ];
 
-pub trait TransportConfig {
-    fn resstablish_after(&self) -> IggyDuration;
-    fn max_retries(&self) -> Option<u32>;
+pub struct IggyCoreConfig {
+    max_retries: Option<u32>,
+    reestablish_after: IggyDuration,
+}
+
+impl Default for IggyCoreConfig {
+    fn default() -> Self {
+        Self { max_retries: None, reestablish_after: 
IggyDuration::from_str("5s").unwrap() }
+    }
 }
 
 pub struct TxBuf {
@@ -58,16 +64,27 @@ pub enum InboundResult {
 }
 
 pub struct IggyCore {
-    state: ClientState,
+    pub(crate) state: ClientState,
     last_connect: Option<IggyTimestamp>,
     pending: VecDeque<(u32 /* code */, Bytes /* payload */, u64 /* 
transport_id */)>,
-    config: Arc<dyn TransportConfig + Send + Sync + 'static>, // todo rewrite 
via generic
+    config: IggyCoreConfig,
     retry_count: u32,
     current_tx: Option<Arc<TxBuf>>,
 }
 
 impl IggyCore {
-    pub fn write(&mut self, cmd: &impl Command, id: u64) -> Result<(), 
IggyError> {
+    pub fn new(config: IggyCoreConfig) -> Self {
+        Self {
+            state: ClientState::Disconnected,
+            last_connect: None,
+            pending: VecDeque::new(),
+            config,
+            retry_count: 0,
+            current_tx: None,
+        }
+    }
+
+    pub fn write(&mut self, code: u32, payload: Bytes, id: u64) -> Result<(), 
IggyError> {
         match self.state {
             ClientState::Shutdown => {
                 trace!("Cannot send data. Client is shutdown.");
@@ -83,7 +100,7 @@ impl IggyCore {
             }
             _ => {}
         }
-        self.pending.push_back((cmd.code(), cmd.to_bytes(), id));
+        self.pending.push_back((code, payload, id));
         Ok(())
     }
 
@@ -118,7 +135,7 @@ impl IggyCore {
 
         self.state = ClientState::Connecting;
 
-        if let Some(max_retries) = self.config.max_retries() {
+        if let Some(max_retries) = self.config.max_retries {
             if self.retry_count >= max_retries {
                 self.state = ClientState::Disconnected;
                 return Err(IggyError::CannotEstablishConnection);
@@ -128,7 +145,7 @@ impl IggyCore {
         if let Some(last_connect) = self.last_connect {
             let now = IggyTimestamp::now();
             let elapsed = now.as_micros() - last_connect.as_micros();
-            let interval = self.config.resstablish_after().as_micros();
+            let interval = self.config.reestablish_after.as_micros();
             if elapsed < interval {
                 let remaining = IggyDuration::from(interval - elapsed);
                 return Ok(Order::Wait(remaining));
diff --git a/core/sdk/src/proto/runtime.rs b/core/sdk/src/proto/runtime.rs
index b3227bf2..022a5adf 100644
--- a/core/sdk/src/proto/runtime.rs
+++ b/core/sdk/src/proto/runtime.rs
@@ -25,3 +25,15 @@ pub fn oneshot<T>() -> (sync::OneShotSender<T>, 
sync::OneShotReceiver<T>) {
 pub fn notify() -> sync::Notify {
     tokio::sync::Notify::new()
 }
+
+pub struct TokioRuntime;
+
+impl Runtime for TokioRuntime {
+    fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
+        tokio::spawn(future);
+    }
+
+    fn sleep(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + 
Send>> {
+        Box::pin(tokio::time::sleep(duration))
+    }
+}
diff --git a/core/sdk/src/transport_adapter/async.rs 
b/core/sdk/src/transport_adapter/async.rs
index 33b0a27d..2ca08c89 100644
--- a/core/sdk/src/transport_adapter/async.rs
+++ b/core/sdk/src/transport_adapter/async.rs
@@ -1,11 +1,12 @@
 use std::{
-    pin::Pin,
-    sync::{Arc, Mutex, atomic::AtomicU64},
+    pin::Pin, str::FromStr, sync::{atomic::AtomicU64, Arc, Mutex}
 };
 use async_broadcast::{Receiver, Sender, broadcast};
 
+use async_trait::async_trait;
 use bytes::Bytes;
-use iggy_common::{Command, DiagnosticEvent, IggyError};
+use iggy_binary_protocol::BinaryTransport;
+use iggy_common::{ClientState, Command, DiagnosticEvent, IggyDuration, 
IggyError};
 use tokio::sync::Notify;
 use tracing::{error, trace};
 
@@ -22,7 +23,7 @@ use crate::{
 pub struct AsyncTransportAdapter<F: ConnectionFactory, R: Runtime, D: Driver> {
     factory: Arc<F>,
     rt: Arc<R>,
-    core: sync::Mutex<IggyCore>,
+    core: Arc<sync::Mutex<IggyCore>>,
     notify: Arc<Notify>,
     id: AtomicU64,
     driver: Arc<D>,
@@ -35,18 +36,30 @@ where
     R: Runtime + Send + Sync + 'static,
     D: Driver + Send + Sync,
 {
-    async fn send_with_response<T: Command>(&self, command: &T) -> 
Result<RespFut, IggyError> {
-            self.ensure_connected().await?;
+    pub fn new(factory: Arc<F>, runtime: Arc<R>, core: 
Arc<sync::Mutex<IggyCore>>, driver: D, notify: Arc<Notify>) -> Self {
+        driver.start();
+        Self {
+            factory: factory,
+            rt: runtime,
+            core,
+            notify,
+            id: AtomicU64::new(0),
+            driver: Arc::new(driver),
+            events: broadcast(1000),
+        }
+    }
+    // async fn send_with_response<T: Command>(&self, command: &T) -> 
Result<RespFut, IggyError> {
+    //         self.ensure_connected().await?;
     
-            let (tx, rx) = runtime::oneshot::<Bytes>();
-            let current_id = self.id.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
+    //         let (tx, rx) = runtime::oneshot::<Bytes>();
+    //         let current_id = self.id.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
 
-            self.core.lock().await.write(command, current_id)?;
-            self.driver.register(current_id, tx);
-            self.notify.notify_waiters();
+    //         self.core.lock().await.write(command, current_id)?;
+    //         self.driver.register(current_id, tx);
+    //         self.notify.notify_waiters();
 
-            Ok(RespFut { rx: rx })
-    }
+    //         Ok(RespFut { rx: rx })
+    // }
 
     pub async fn connect(&self) -> Result<(), IggyError> {
         let mut order = self.core.lock().await.start_connect()?;
@@ -83,11 +96,11 @@ where
         }
     }
 
-    async fn publish_event(&self, event: DiagnosticEvent) {
-        if let Err(error) = self.events.0.broadcast(event).await {
-            error!("Failed to send a QUIC diagnostic event: {error}");
-        }
-    }
+    // async fn publish_event(&self, event: DiagnosticEvent) {
+    //     if let Err(error) = self.events.0.broadcast(event).await {
+    //         error!("Failed to send a QUIC diagnostic event: {error}");
+    //     }
+    // }
 
     async fn ensure_connected(&self) -> Result<(), IggyError> {
         if self.factory.is_alive().await {
@@ -106,3 +119,49 @@ where
 
     // TODO add async fn login
 }
+
+#[async_trait]
+impl<F, R, D> BinaryTransport for AsyncTransportAdapter<F, R, D>
+where
+    F: ConnectionFactory + Send + Sync + 'static,
+    R: Runtime + Send + Sync + 'static,
+    D: Driver + Send + Sync,
+{
+    async fn send_with_response<T: Command>(&self, command: &T) -> 
Result<Bytes, IggyError> {
+        command.validate()?;
+        self.send_raw_with_response(command.code(), command.to_bytes())
+            .await
+    }
+
+    async fn send_raw_with_response(&self, code: u32, payload: Bytes) -> 
Result<Bytes, IggyError> {
+            self.ensure_connected().await?;
+    
+            let (tx, rx) = runtime::oneshot::<Bytes>();
+            let current_id = self.id.fetch_add(1, 
std::sync::atomic::Ordering::SeqCst);
+
+            self.core.lock().await.write(code, payload, current_id)?;
+            self.driver.register(current_id, tx);
+            self.notify.notify_waiters();
+
+            let resp = RespFut{rx};
+            resp.await
+    }
+
+    fn get_heartbeat_interval(&self) -> IggyDuration {
+        IggyDuration::from_str("5s").unwrap()
+    }
+
+    /// Gets the state of the client.
+    async fn get_state(&self) -> ClientState {
+        self.core.lock().await.state
+    }
+
+    async fn set_state(&self, _state: ClientState) {
+    }
+
+    async fn publish_event(&self, event: DiagnosticEvent) {
+        if let Err(error) = self.events.0.broadcast(event).await {
+            error!("Failed to send a QUIC diagnostic event: {error}");
+        }
+    }
+}

Reply via email to