This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 7090bc86e fix(io_uring): resolve clippy lints and add WebSocket to 
benchmark tool (#2256)
7090bc86e is described below

commit 7090bc86e61c0e2735c6969e172428f0ccd9fc8c
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Oct 13 15:00:19 2025 +0200

    fix(io_uring): resolve clippy lints and add WebSocket to benchmark tool 
(#2256)
    
    - Fix clippy warnings in websocket server code
    - Add WebSocket transport support to iggy-bench (alias: ws)
    - Update default WebSocket address to 8092
---
 core/bench/src/args/common.rs                      |  1 +
 core/bench/src/args/defaults.rs                    |  2 ++
 core/bench/src/args/transport.rs                   | 42 ++++++++++++++++++++++
 core/common/src/types/args/mod.rs                  |  4 +--
 .../tests/cli/general/test_help_command.rs         |  2 +-
 core/server/src/configs/websocket.rs               | 40 ++++++++++-----------
 core/server/src/websocket/connection_handler.rs    |  6 ++--
 core/server/src/websocket/websocket_listener.rs    |  3 +-
 8 files changed, 72 insertions(+), 28 deletions(-)

diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index a31972f42..6dd36b20c 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -360,6 +360,7 @@ impl IggyBenchArgs {
             BenchmarkTransportCommand::Tcp(_) => "tcp",
             BenchmarkTransportCommand::Quic(_) => "quic",
             BenchmarkTransportCommand::Http(_) => "http",
+            BenchmarkTransportCommand::WebSocket(_) => "ws",
         };
 
         let actors = match &self.benchmark_kind {
diff --git a/core/bench/src/args/defaults.rs b/core/bench/src/args/defaults.rs
index ee2cbf8a4..7695782fb 100644
--- a/core/bench/src/args/defaults.rs
+++ b/core/bench/src/args/defaults.rs
@@ -29,6 +29,8 @@ pub const DEFAULT_QUIC_SERVER_ADDRESS: &str = 
"127.0.0.1:8080";
 pub const DEFAULT_QUIC_SERVER_NAME: &str = "localhost";
 pub const DEFAULT_QUIC_VALIDATE_CERTIFICATE: bool = false;
 
+pub const DEFAULT_WEBSOCKET_SERVER_ADDRESS: &str = "127.0.0.1:8092";
+
 pub const DEFAULT_MESSAGES_PER_BATCH: NonZeroU32 = u32!(1000);
 pub const DEFAULT_MESSAGE_BATCHES: NonZeroU32 = u32!(1000);
 pub const DEFAULT_MESSAGE_SIZE: NonZeroU32 = u32!(1000);
diff --git a/core/bench/src/args/transport.rs b/core/bench/src/args/transport.rs
index 7fa8ed42a..ab23ac356 100644
--- a/core/bench/src/args/transport.rs
+++ b/core/bench/src/args/transport.rs
@@ -19,6 +19,7 @@
 use super::defaults::{
     DEFAULT_HTTP_SERVER_ADDRESS, DEFAULT_QUIC_CLIENT_ADDRESS, 
DEFAULT_QUIC_SERVER_ADDRESS,
     DEFAULT_QUIC_SERVER_NAME, DEFAULT_QUIC_VALIDATE_CERTIFICATE, 
DEFAULT_TCP_SERVER_ADDRESS,
+    DEFAULT_WEBSOCKET_SERVER_ADDRESS,
 };
 use super::{output::BenchmarkOutputCommand, props::BenchmarkTransportProps};
 use clap::{Parser, Subcommand};
@@ -30,6 +31,8 @@ pub enum BenchmarkTransportCommand {
     Http(HttpArgs),
     Tcp(TcpArgs),
     Quic(QuicArgs),
+    #[command(alias = "ws")]
+    WebSocket(WebSocketArgs),
 }
 
 impl Serialize for BenchmarkTransportCommand {
@@ -41,6 +44,7 @@ impl Serialize for BenchmarkTransportCommand {
             Self::Http(_) => "http",
             Self::Tcp(_) => "tcp",
             Self::Quic(_) => "quic",
+            Self::WebSocket(_) => "websocket",
         };
         serializer.serialize_str(variant_str)
     }
@@ -72,6 +76,7 @@ impl BenchmarkTransportProps for BenchmarkTransportCommand {
             Self::Http(args) => args,
             Self::Tcp(args) => args,
             Self::Quic(args) => args,
+            Self::WebSocket(args) => args,
         }
     }
 
@@ -222,3 +227,40 @@ impl BenchmarkTransportProps for QuicArgs {
         self.output.as_ref()
     }
 }
+
+#[derive(Parser, Debug, Clone)]
+pub struct WebSocketArgs {
+    /// Address of the WebSocket iggy-server
+    #[arg(long, default_value_t = DEFAULT_WEBSOCKET_SERVER_ADDRESS.to_owned())]
+    pub server_address: String,
+
+    /// Optional output command, used to output results (charts, raw json 
data) to a directory
+    #[command(subcommand)]
+    pub output: Option<BenchmarkOutputCommand>,
+}
+
+impl BenchmarkTransportProps for WebSocketArgs {
+    fn transport(&self) -> &TransportProtocol {
+        &TransportProtocol::WebSocket
+    }
+
+    fn server_address(&self) -> &str {
+        &self.server_address
+    }
+
+    fn validate_certificate(&self) -> bool {
+        panic!("Cannot validate certificate for WebSocket transport!")
+    }
+
+    fn client_address(&self) -> &str {
+        panic!("Setting client address for WebSocket transport is not 
supported!")
+    }
+
+    fn nodelay(&self) -> bool {
+        panic!("Setting nodelay for WebSocket transport is not supported!")
+    }
+
+    fn output_command(&self) -> Option<&BenchmarkOutputCommand> {
+        self.output.as_ref()
+    }
+}
diff --git a/core/common/src/types/args/mod.rs 
b/core/common/src/types/args/mod.rs
index bba968d24..e4e1a56f1 100644
--- a/core/common/src/types/args/mod.rs
+++ b/core/common/src/types/args/mod.rs
@@ -201,7 +201,7 @@ pub struct ArgsOptional {
 
     /// The optional server address for the WebSocket transport
     ///
-    /// [default: 127.0.0.1:8095]
+    /// [default: 127.0.0.1:8092]
     #[arg(long)]
     #[serde(skip_serializing_if = "Option::is_none")]
     pub websocket_server_address: Option<String>,
@@ -403,7 +403,7 @@ impl Default for Args {
             quic_max_idle_timeout: 10000,
             quic_validate_certificate: false,
             quic_heartbeat_interval: "5s".to_string(),
-            websocket_server_address: "127.0.0.1:8095".to_string(),
+            websocket_server_address: "127.0.0.1:8092".to_string(),
             websocket_reconnection_enabled: true,
             websocket_reconnection_max_retries: None,
             websocket_reconnection_interval: "1s".to_string(),
diff --git a/core/integration/tests/cli/general/test_help_command.rs 
b/core/integration/tests/cli/general/test_help_command.rs
index 623b546e1..9ae5fcb95 100644
--- a/core/integration/tests/cli/general/test_help_command.rs
+++ b/core/integration/tests/cli/general/test_help_command.rs
@@ -177,7 +177,7 @@ Options:
       --websocket-server-address <WEBSOCKET_SERVER_ADDRESS>
           The optional server address for the WebSocket transport
 {CLAP_INDENT}
-          [default: 127.0.0.1:8095]
+          [default: 127.0.0.1:8092]
 
       --websocket-reconnection-max-retries <WEBSOCKET_RECONNECTION_MAX_RETRIES>
           The optional number of max reconnect retries for the WebSocket 
transport
diff --git a/core/server/src/configs/websocket.rs 
b/core/server/src/configs/websocket.rs
index 6d8a5b95b..7841edfbe 100644
--- a/core/server/src/configs/websocket.rs
+++ b/core/server/src/configs/websocket.rs
@@ -53,34 +53,34 @@ impl WebSocketConfig {
 
         let mut config = TungsteniteConfig::default();
 
-        if let Some(read_buf_size_str) = &self.read_buffer_size {
-            if let Ok(byte_size) = read_buf_size_str.parse::<IggyByteSize>() {
-                config = config.read_buffer_size(byte_size.as_bytes_u64() as 
usize);
-            }
+        if let Some(read_buf_size_str) = &self.read_buffer_size
+            && let Ok(byte_size) = read_buf_size_str.parse::<IggyByteSize>()
+        {
+            config = config.read_buffer_size(byte_size.as_bytes_u64() as 
usize);
         }
 
-        if let Some(write_buf_size_str) = &self.write_buffer_size {
-            if let Ok(byte_size) = write_buf_size_str.parse::<IggyByteSize>() {
-                config = config.write_buffer_size(byte_size.as_bytes_u64() as 
usize);
-            }
+        if let Some(write_buf_size_str) = &self.write_buffer_size
+            && let Ok(byte_size) = write_buf_size_str.parse::<IggyByteSize>()
+        {
+            config = config.write_buffer_size(byte_size.as_bytes_u64() as 
usize);
         }
 
-        if let Some(max_write_buf_size_str) = &self.max_write_buffer_size {
-            if let Ok(byte_size) = 
max_write_buf_size_str.parse::<IggyByteSize>() {
-                config = config.max_write_buffer_size(byte_size.as_bytes_u64() 
as usize);
-            }
+        if let Some(max_write_buf_size_str) = &self.max_write_buffer_size
+            && let Ok(byte_size) = 
max_write_buf_size_str.parse::<IggyByteSize>()
+        {
+            config = config.max_write_buffer_size(byte_size.as_bytes_u64() as 
usize);
         }
 
-        if let Some(msg_size_str) = &self.max_message_size {
-            if let Ok(byte_size) = msg_size_str.parse::<IggyByteSize>() {
-                config = config.max_message_size(Some(byte_size.as_bytes_u64() 
as usize));
-            }
+        if let Some(msg_size_str) = &self.max_message_size
+            && let Ok(byte_size) = msg_size_str.parse::<IggyByteSize>()
+        {
+            config = config.max_message_size(Some(byte_size.as_bytes_u64() as 
usize));
         }
 
-        if let Some(frame_size_str) = &self.max_frame_size {
-            if let Ok(byte_size) = frame_size_str.parse::<IggyByteSize>() {
-                config = config.max_frame_size(Some(byte_size.as_bytes_u64() 
as usize));
-            }
+        if let Some(frame_size_str) = &self.max_frame_size
+            && let Ok(byte_size) = frame_size_str.parse::<IggyByteSize>()
+        {
+            config = config.max_frame_size(Some(byte_size.as_bytes_u64() as 
usize));
         }
 
         config = config.accept_unmasked_frames(self.accept_unmasked_frames);
diff --git a/core/server/src/websocket/connection_handler.rs 
b/core/server/src/websocket/connection_handler.rs
index 8f100d8c2..33f7f26b5 100644
--- a/core/server/src/websocket/connection_handler.rs
+++ b/core/server/src/websocket/connection_handler.rs
@@ -69,14 +69,14 @@ pub(crate) async fn handle_connection(
         let length =
             
u32::from_le_bytes(initial_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap());
         let (res, mut code_buffer_out) = sender.read(code_buffer).await;
-        let _ = res?;
+        res?;
         let code: u32 =
             
u32::from_le_bytes(code_buffer_out[0..INITIAL_BYTES_LENGTH].try_into().unwrap());
 
         initial_buffer.clear();
         code_buffer_out.clear();
-        length_buffer = BytesMut::from(initial_buffer);
-        code_buffer = BytesMut::from(code_buffer_out);
+        length_buffer = initial_buffer;
+        code_buffer = code_buffer_out;
 
         debug!("Received a WebSocket request, length: {length}, code: {code}");
         let command = ServerCommand::from_code_and_reader(code, sender, length 
- 4).await?;
diff --git a/core/server/src/websocket/websocket_listener.rs 
b/core/server/src/websocket/websocket_listener.rs
index 6d884750b..b3fc5969f 100644
--- a/core/server/src/websocket/websocket_listener.rs
+++ b/core/server/src/websocket/websocket_listener.rs
@@ -92,7 +92,6 @@ async fn accept_loop(
 ) -> Result<(), IggyError> {
     loop {
         let shard = shard.clone();
-        let ws_config = ws_config.clone();
         let accept_future = listener.accept();
 
         futures::select! {
@@ -110,7 +109,7 @@ async fn accept_loop(
                         shard_info!(shard.id, "Accepted new WebSocket 
connection from: {}", remote_addr);
 
                         let shard_clone = shard.clone();
-                        let ws_config_clone = ws_config.clone();
+                        let ws_config_clone = ws_config;
                         let registry = shard.task_registry.clone();
                         let registry_clone = registry.clone();
 

Reply via email to