This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch mcp
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/mcp by this push:
     new 988e1a36 add test mcp server and first integration test
988e1a36 is described below

commit 988e1a36418034e80007683d4b7cfde5decddf62
Author: spetz <[email protected]>
AuthorDate: Sat Jul 19 22:29:17 2025 +0200

    add test mcp server and first integration test
---
 Cargo.lock                              |  39 +-----
 core/integration/Cargo.toml             |   5 +-
 core/integration/src/test_mcp_server.rs | 202 ++++++++++++++++++++++++++++++++
 core/integration/tests/mcp/mod.rs       |  27 ++++-
 4 files changed, 230 insertions(+), 43 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c498c2b2..8c7634ea 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4267,7 +4267,9 @@ dependencies = [
  "libc",
  "log",
  "predicates",
+ "rand 0.9.1",
  "rcgen",
+ "reqwest",
  "rmcp",
  "serial_test",
  "server",
@@ -5250,26 +5252,6 @@ dependencies = [
  "libc",
 ]
 
-[[package]]
-name = "oauth2"
-version = "5.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "51e219e79014df21a225b1860a479e2dcd7cbd9130f4defd4bd0e191ea31d67d"
-dependencies = [
- "base64 0.21.7",
- "chrono",
- "getrandom 0.2.16",
- "http 1.3.1",
- "rand 0.8.5",
- "reqwest",
- "serde",
- "serde_json",
- "serde_path_to_error",
- "sha2",
- "thiserror 1.0.69",
- "url",
-]
-
 [[package]]
 name = "objc2-core-foundation"
 version = "0.3.1"
@@ -5977,20 +5959,6 @@ dependencies = [
  "yansi",
 ]
 
-[[package]]
-name = "process-wrap"
-version = "8.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a3ef4f2f0422f23a82ec9f628ea2acd12871c81a9362b02c43c1aa86acfc3ba1"
-dependencies = [
- "futures",
- "indexmap 2.9.0",
- "nix",
- "tokio",
- "tracing",
- "windows 0.61.3",
-]
-
 [[package]]
 name = "prokio"
 version = "0.1.0"
@@ -6595,10 +6563,8 @@ dependencies = [
  "http 1.3.1",
  "http-body",
  "http-body-util",
- "oauth2",
  "paste",
  "pin-project-lite",
- "process-wrap",
  "rand 0.9.1",
  "reqwest",
  "rmcp-macros",
@@ -6612,7 +6578,6 @@ dependencies = [
  "tokio-util",
  "tower-service",
  "tracing",
- "url",
  "uuid",
 ]
 
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 5d8f0d79..eb3ff1ba 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -45,14 +45,13 @@ lazy_static = { workspace = true }
 libc = "0.2.174"
 log = { workspace = true }
 predicates = { workspace = true }
+rand = { workspace = true }
 rcgen = "0.13.2"
+reqwest = { workspace = true }
 rmcp = { version = "0.3.0", features = [
     "client",
     "reqwest",
     "transport-streamable-http-client",
-    "transport-child-process",
-    "tower",
-    "auth",
 ] }
 serial_test = { workspace = true }
 server = { workspace = true }
diff --git a/core/integration/src/test_mcp_server.rs 
b/core/integration/src/test_mcp_server.rs
index 31bd66e6..e1e297ec 100644
--- a/core/integration/src/test_mcp_server.rs
+++ b/core/integration/src/test_mcp_server.rs
@@ -15,3 +15,205 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
+use assert_cmd::prelude::CommandCargoExt;
+use rand::Rng;
+use rmcp::{
+    RoleClient, ServiceExt,
+    model::{ClientCapabilities, ClientInfo, Implementation, 
InitializeRequestParam},
+    service::RunningService,
+    transport::StreamableHttpClientTransport,
+};
+use std::fs::OpenOptions;
+use std::io::Write;
+use std::net::{Ipv4Addr, SocketAddr};
+use std::path::PathBuf;
+use std::process::{Child, Command};
+use std::time::Duration;
+use std::{collections::HashMap, net::TcpListener};
+use tokio::time::sleep;
+
+const MCP_PATH: &str = "/mcp";
+
+pub type McpClient = RunningService<RoleClient, InitializeRequestParam>;
+
+#[derive(Debug)]
+pub struct TestMcpServer {
+    envs: HashMap<String, String>,
+    child_handle: Option<Child>,
+    server_address: SocketAddr,
+    stdout_file_path: Option<PathBuf>,
+    stderr_file_path: Option<PathBuf>,
+    server_executable_path: Option<String>,
+}
+
+impl TestMcpServer {
+    pub fn with_iggy_address(iggy_tcp_server_address: &str) -> Self {
+        Self::new(iggy_tcp_server_address, None, None)
+    }
+
+    pub fn new(
+        iggy_tcp_server_address: &str,
+        extra_envs: Option<HashMap<String, String>>,
+        server_executable_path: Option<String>,
+    ) -> Self {
+        let mut envs = HashMap::new();
+        if let Some(extra) = extra_envs {
+            for (key, value) in extra {
+                envs.insert(key, value);
+            }
+        }
+
+        envs.insert("IGGY_MCP_HTTP_PATH".to_string(), MCP_PATH.to_string());
+        envs.insert(
+            "IGGY_MCP_IGGY_ADDRESS".to_string(),
+            iggy_tcp_server_address.to_string(),
+        );
+        envs.insert("IGGY_MCP_TRANSPORT".to_string(), "http".to_string());
+        Self::create(envs, server_executable_path)
+    }
+
+    pub fn create(envs: HashMap<String, String>, server_executable_path: 
Option<String>) -> Self {
+        let server_address = Self::get_random_server_address();
+
+        Self {
+            envs,
+            child_handle: None,
+            server_address,
+            stdout_file_path: None,
+            stderr_file_path: None,
+            server_executable_path,
+        }
+    }
+
+    pub fn start(&mut self) {
+        self.envs
+            .entry("IGGY_MCP_HTTP_ADDRESS".to_string())
+            .or_insert(self.server_address.to_string());
+        let mut command = if let Some(server_executable_path) = 
&self.server_executable_path {
+            Command::new(server_executable_path)
+        } else {
+            Command::cargo_bin("iggy-mcp").unwrap()
+        };
+        command.envs(self.envs.clone());
+        let child = command.spawn().unwrap();
+        self.child_handle = Some(child);
+    }
+
+    pub fn stop(&mut self) {
+        #[allow(unused_mut)]
+        if let Some(mut child_handle) = self.child_handle.take() {
+            #[cfg(unix)]
+            unsafe {
+                use libc::SIGTERM;
+                use libc::kill;
+                kill(child_handle.id() as libc::pid_t, SIGTERM);
+            }
+
+            #[cfg(not(unix))]
+            child_handle.kill().unwrap();
+
+            if let Ok(output) = child_handle.wait_with_output() {
+                let stderr = String::from_utf8_lossy(&output.stderr);
+                let stdout = String::from_utf8_lossy(&output.stdout);
+                if let Some(stderr_file_path) = &self.stderr_file_path {
+                    OpenOptions::new()
+                        .append(true)
+                        .create(true)
+                        .open(stderr_file_path)
+                        .unwrap()
+                        .write_all(stderr.as_bytes())
+                        .unwrap();
+                }
+
+                if let Some(stdout_file_path) = &self.stdout_file_path {
+                    OpenOptions::new()
+                        .append(true)
+                        .create(true)
+                        .open(stdout_file_path)
+                        .unwrap()
+                        .write_all(stdout.as_bytes())
+                        .unwrap();
+                }
+            }
+        }
+    }
+
+    pub fn is_started(&self) -> bool {
+        self.child_handle.is_some()
+    }
+
+    pub fn pid(&self) -> u32 {
+        self.child_handle.as_ref().unwrap().id()
+    }
+
+    fn get_http_mcp_api_address(&self) -> String {
+        format!("{}{MCP_PATH}", self.get_http_api_address())
+    }
+
+    fn get_http_api_address(&self) -> String {
+        format!(
+            "http://{}:{}";,
+            self.server_address.ip(),
+            self.server_address.port()
+        )
+    }
+
+    pub async fn ensure_started(&self) {
+        let http_api_address = self.get_http_api_address();
+        let client = reqwest::Client::new();
+        let max_retries = 100;
+        let mut retries = 0;
+        while let Err(error) = client.get(&http_api_address).send().await {
+            sleep(Duration::from_millis(10)).await;
+            retries += 1;
+            if retries >= max_retries {
+                panic!(
+                    "Failed to ping MCP server: {http_api_address} after 
{max_retries} retries. {error}"
+                );
+            }
+        }
+        println!("MCP server address started at: {http_api_address}");
+    }
+
+    pub async fn get_client(&self) -> McpClient {
+        let mcp_http_api_address = self.get_http_mcp_api_address();
+        let transport = 
StreamableHttpClientTransport::from_uri(mcp_http_api_address);
+        let client_info = ClientInfo {
+            protocol_version: Default::default(),
+            capabilities: ClientCapabilities::default(),
+            client_info: Implementation {
+                name: "test-mcp-client".to_string(),
+                version: "1.0.0".to_string(),
+            },
+        };
+        client_info
+            .serve(transport)
+            .await
+            .inspect_err(|error| {
+                eprintln!("MCP client error: {error}");
+            })
+            .expect("Failed to create MCP client")
+    }
+
+    fn get_random_server_address() -> SocketAddr {
+        let mut rng = rand::thread_rng();
+        let max_retries = 10;
+
+        for _ in 0..max_retries {
+            let port = rng.gen_range(49152..=65535);
+            let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port);
+            if TcpListener::bind(addr).is_ok() {
+                return addr;
+            }
+        }
+
+        panic!("Failed to find a free port after {max_retries} retries");
+    }
+}
+
+impl Drop for TestMcpServer {
+    fn drop(&mut self) {
+        self.stop();
+    }
+}
diff --git a/core/integration/tests/mcp/mod.rs 
b/core/integration/tests/mcp/mod.rs
index 2e79fae5..33d55f9c 100644
--- a/core/integration/tests/mcp/mod.rs
+++ b/core/integration/tests/mcp/mod.rs
@@ -16,11 +16,32 @@
  * under the License.
  */
 
-use integration::test_server::TestServer;
+use integration::{test_mcp_server::TestMcpServer, test_server::TestServer};
 
 #[tokio::test]
-async fn test_mcp() {
+async fn mcp_server_should_list_tools() {
     let mut test_server = TestServer::default();
     test_server.start();
-    let _server_addr = test_server.get_raw_tcp_addr().unwrap();
+    let iggy_server_address = test_server
+        .get_raw_tcp_addr()
+        .expect("Failed to get Iggy TCP address");
+    println!("Iggy server address: {iggy_server_address}");
+    let mut test_mcp_server = 
TestMcpServer::with_iggy_address(&iggy_server_address);
+    test_mcp_server.start();
+    test_mcp_server.ensure_started().await;
+    let client = test_mcp_server.get_client().await;
+    println!("Invoking MCP client");
+
+    let server_info = client.peer_info();
+    println!("Connected to MCP server: {server_info:#?}");
+
+    let tools = client
+        .list_tools(Default::default())
+        .await
+        .expect("Failed to list tools");
+    println!("Available tools: {tools:#?}");
+
+    assert!(!tools.tools.is_empty());
+    let tools_count = tools.tools.len();
+    assert_eq!(tools_count, 40);
 }

Reply via email to