This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch draft-sans
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 2e6833ac553632738b4d6746f4ca7166966418c1
Author: haze518 <[email protected]>
AuthorDate: Fri Jul 11 07:41:12 2025 +0600

    before binary transport
---
 core/sdk/src/connection/tcp/mod.rs |  6 +++++
 core/sdk/src/connection/tcp/tcp.rs |  1 -
 core/sdk/src/driver/mod.rs         |  2 +-
 core/sdk/src/driver/tcp.rs         | 45 +++++++++++++++++++++++++++++++++-----
 core/sdk/src/proto/connection.rs   |  4 ++++
 5 files changed, 51 insertions(+), 7 deletions(-)

diff --git a/core/sdk/src/connection/tcp/mod.rs 
b/core/sdk/src/connection/tcp/mod.rs
index c46b3ed6..5c07fffd 100644
--- a/core/sdk/src/connection/tcp/mod.rs
+++ b/core/sdk/src/connection/tcp/mod.rs
@@ -38,6 +38,12 @@ impl StreamPair for TokioTcpStream {
                 );
                 IggyError::TcpError
             })?;
+            self.writer.flush().await.map_err(|e| {
+                error!(
+                    "Failed to write data to the TCP connection: {e}",
+                );
+                IggyError::TcpError
+            })?;
             Ok(())
         })
     }
diff --git a/core/sdk/src/connection/tcp/tcp.rs 
b/core/sdk/src/connection/tcp/tcp.rs
index c8ca9672..cf22eb16 100644
--- a/core/sdk/src/connection/tcp/tcp.rs
+++ b/core/sdk/src/connection/tcp/tcp.rs
@@ -71,7 +71,6 @@ impl ConnectionFactory for TokioTcpFactory {
         let conn = self.stream.clone();
         Box::pin(async move {
             if let Some(mut conn) = conn.lock().await.take() {
-                // TODO в оригинале вызывается tcpStream.writer.shutdown, 
нужно понять есть ли разница
                 conn.writer.shutdown().await.map_err(|e| {
                     error!(
                         "Failed to shutdown the TCP connection to the TCP 
connection: {e}",
diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs
index f1e786f6..672274a8 100644
--- a/core/sdk/src/driver/mod.rs
+++ b/core/sdk/src/driver/mod.rs
@@ -46,7 +46,7 @@ where
         let cfg = self.config.clone();
         let pending = self.pending.clone();
         rt.spawn(Box::pin(async move {
-            let mut rx_buf = BytesMut::with_capacity(cfg.response_buffer_size 
as usize);
+            let mut rx_buf = BytesMut::new();
             loop {
                 nt.notified().await;
 
diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs
index 641f9538..c81140ee 100644
--- a/core/sdk/src/driver/tcp.rs
+++ b/core/sdk/src/driver/tcp.rs
@@ -1,11 +1,11 @@
-use std::sync::Arc;
+use std::{io::Cursor, sync::Arc};
 
 use bytes::{Bytes, BytesMut};
 use dashmap::DashMap;
 use iggy_common::IggyError;
 use tracing::error;
 
-use crate::{connection::{tcp::tcp::TokioTcpFactory, StreamPair}, 
driver::Driver, proto::{connection::IggyCore, runtime::{sync, Runtime}}};
+use crate::{connection::{tcp::tcp::TokioTcpFactory, StreamPair}, 
driver::Driver, proto::{connection::{IggyCore, InboundResult}, runtime::{sync, 
Runtime}}};
 
 pub struct TokioTcpDriver<R>
 where
@@ -30,7 +30,7 @@ where
         let pending = self.pending.clone();
 
         rt.spawn(Box::pin(async move {
-            let mut rx_buf = BytesMut::with_capacity(8); // 
RESPONSE_INITIAL_BYTES_LENGTH
+            let mut rx_buf = BytesMut::new();
             loop {
                 nt.notified().await;
                 while let Some(data) = {
@@ -50,7 +50,42 @@ where
                         continue;
                     }
 
-                    ...
+                    let init_len = core.lock().await.initial_bytes_len();
+                    let mut at_most = init_len;
+                    loop {
+                        rx_buf.reserve(at_most);
+
+                        match stream.read_buf(&mut rx_buf).await {
+                            Ok(0)   => { error!("EOF before header/body"); 
break }
+                            Ok(n)   => n,
+                            Err(e)  => { error!("read_buf failed: {e}");   
break }
+                        };
+
+                        let buf = Cursor::new(&rx_buf[..]);
+
+                        let inbound = {
+                            let mut guard = core.lock().await;
+                            guard.feed_inbound(buf)
+                        };
+
+                        match inbound {
+                            InboundResult::Need(need) => at_most = need,
+                            InboundResult::Ready(position) => {
+                                let frame = rx_buf.split_to(position).freeze();
+                                if let Some((_k, tx)) = 
pending.remove(&data.id) {
+                                    let _ = tx.send(frame);
+                                }
+                                core.lock().await.mark_tx_done();
+                                at_most = init_len;
+                                continue;
+                            }
+                            InboundResult::Error(_) => {
+                                pending.remove(&data.id);
+                                core.lock().await.mark_tx_done();
+                                break;
+                            }
+                        }
+                    }
                 }
             }
         }));
@@ -59,4 +94,4 @@ where
     fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>) {
         self.pending.insert(id, tx);
     }
-}
\ No newline at end of file
+}
diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs
index 3ab1cca6..de7cee74 100644
--- a/core/sdk/src/proto/connection.rs
+++ b/core/sdk/src/proto/connection.rs
@@ -159,6 +159,10 @@ impl IggyCore {
         self.current_tx = None
     }
 
+    pub fn initial_bytes_len(&self) -> usize {
+        RESPONSE_INITIAL_BYTES_LENGTH
+    }
+
     pub fn feed_inbound(&mut self, mut cur: Cursor<&[u8]>) -> InboundResult {
         let buf_len = cur.get_ref().len();
         if buf_len < RESPONSE_INITIAL_BYTES_LENGTH {

Reply via email to