This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch draft-sans in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 2e6833ac553632738b4d6746f4ca7166966418c1 Author: haze518 <[email protected]> AuthorDate: Fri Jul 11 07:41:12 2025 +0600 before binary transport --- core/sdk/src/connection/tcp/mod.rs | 6 +++++ core/sdk/src/connection/tcp/tcp.rs | 1 - core/sdk/src/driver/mod.rs | 2 +- core/sdk/src/driver/tcp.rs | 45 +++++++++++++++++++++++++++++++++----- core/sdk/src/proto/connection.rs | 4 ++++ 5 files changed, 51 insertions(+), 7 deletions(-) diff --git a/core/sdk/src/connection/tcp/mod.rs b/core/sdk/src/connection/tcp/mod.rs index c46b3ed6..5c07fffd 100644 --- a/core/sdk/src/connection/tcp/mod.rs +++ b/core/sdk/src/connection/tcp/mod.rs @@ -38,6 +38,12 @@ impl StreamPair for TokioTcpStream { ); IggyError::TcpError })?; + self.writer.flush().await.map_err(|e| { + error!( + "Failed to write data to the TCP connection: {e}", + ); + IggyError::TcpError + })?; Ok(()) }) } diff --git a/core/sdk/src/connection/tcp/tcp.rs b/core/sdk/src/connection/tcp/tcp.rs index c8ca9672..cf22eb16 100644 --- a/core/sdk/src/connection/tcp/tcp.rs +++ b/core/sdk/src/connection/tcp/tcp.rs @@ -71,7 +71,6 @@ impl ConnectionFactory for TokioTcpFactory { let conn = self.stream.clone(); Box::pin(async move { if let Some(mut conn) = conn.lock().await.take() { - // TODO в оригинале вызывается tcpStream.writer.shutdown, нужно понять есть ли разница conn.writer.shutdown().await.map_err(|e| { error!( "Failed to shutdown the TCP connection to the TCP connection: {e}", diff --git a/core/sdk/src/driver/mod.rs b/core/sdk/src/driver/mod.rs index f1e786f6..672274a8 100644 --- a/core/sdk/src/driver/mod.rs +++ b/core/sdk/src/driver/mod.rs @@ -46,7 +46,7 @@ where let cfg = self.config.clone(); let pending = self.pending.clone(); rt.spawn(Box::pin(async move { - let mut rx_buf = BytesMut::with_capacity(cfg.response_buffer_size as usize); + let mut rx_buf = BytesMut::new(); loop { nt.notified().await; diff --git a/core/sdk/src/driver/tcp.rs b/core/sdk/src/driver/tcp.rs index 641f9538..c81140ee 100644 --- a/core/sdk/src/driver/tcp.rs +++ b/core/sdk/src/driver/tcp.rs @@ -1,11 +1,11 @@ -use std::sync::Arc; +use std::{io::Cursor, sync::Arc}; use bytes::{Bytes, BytesMut}; use dashmap::DashMap; use iggy_common::IggyError; use tracing::error; -use crate::{connection::{tcp::tcp::TokioTcpFactory, StreamPair}, driver::Driver, proto::{connection::IggyCore, runtime::{sync, Runtime}}}; +use crate::{connection::{tcp::tcp::TokioTcpFactory, StreamPair}, driver::Driver, proto::{connection::{IggyCore, InboundResult}, runtime::{sync, Runtime}}}; pub struct TokioTcpDriver<R> where @@ -30,7 +30,7 @@ where let pending = self.pending.clone(); rt.spawn(Box::pin(async move { - let mut rx_buf = BytesMut::with_capacity(8); // RESPONSE_INITIAL_BYTES_LENGTH + let mut rx_buf = BytesMut::new(); loop { nt.notified().await; while let Some(data) = { @@ -50,7 +50,42 @@ where continue; } - ... + let init_len = core.lock().await.initial_bytes_len(); + let mut at_most = init_len; + loop { + rx_buf.reserve(at_most); + + match stream.read_buf(&mut rx_buf).await { + Ok(0) => { error!("EOF before header/body"); break } + Ok(n) => n, + Err(e) => { error!("read_buf failed: {e}"); break } + }; + + let buf = Cursor::new(&rx_buf[..]); + + let inbound = { + let mut guard = core.lock().await; + guard.feed_inbound(buf) + }; + + match inbound { + InboundResult::Need(need) => at_most = need, + InboundResult::Ready(position) => { + let frame = rx_buf.split_to(position).freeze(); + if let Some((_k, tx)) = pending.remove(&data.id) { + let _ = tx.send(frame); + } + core.lock().await.mark_tx_done(); + at_most = init_len; + continue; + } + InboundResult::Error(_) => { + pending.remove(&data.id); + core.lock().await.mark_tx_done(); + break; + } + } + } } } })); @@ -59,4 +94,4 @@ where fn register(&self, id: u64, tx: sync::OneShotSender<Bytes>) { self.pending.insert(id, tx); } -} \ No newline at end of file +} diff --git a/core/sdk/src/proto/connection.rs b/core/sdk/src/proto/connection.rs index 3ab1cca6..de7cee74 100644 --- a/core/sdk/src/proto/connection.rs +++ b/core/sdk/src/proto/connection.rs @@ -159,6 +159,10 @@ impl IggyCore { self.current_tx = None } + pub fn initial_bytes_len(&self) -> usize { + RESPONSE_INITIAL_BYTES_LENGTH + } + pub fn feed_inbound(&mut self, mut cur: Cursor<&[u8]>) -> InboundResult { let buf_len = cur.get_ref().len(); if buf_len < RESPONSE_INITIAL_BYTES_LENGTH {
