This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch mcp in repository https://gitbox.apache.org/repos/asf/iggy.git
commit cfa36dea29fcde12c5da2f1f643a95dd2d96ec8f Author: spetz <[email protected]> AuthorDate: Thu Jul 10 21:57:37 2025 +0200 feat(mcp): add Iggy MCP server --- Cargo.lock | 113 +++++++++++++++- Cargo.toml | 1 + DEPENDENCIES.md | 7 + core/ai/README.md | 3 + core/ai/mcp/Cargo.toml | 23 ++++ core/ai/mcp/README.md | 3 + core/ai/mcp/config.toml | 33 +++++ core/ai/mcp/src/configs.rs | 60 +++++++++ core/ai/mcp/src/error.rs | 35 +++++ core/ai/mcp/src/main.rs | 152 ++++++++++++++++++++++ core/ai/mcp/src/service/consumer_groups.rs | 17 +++ core/ai/mcp/src/service/consumer_offsets.rs | 17 +++ core/ai/mcp/src/service/messages.rs | 17 +++ core/ai/mcp/src/service/mod.rs | 119 +++++++++++++++++ core/ai/mcp/src/service/partitions.rs | 17 +++ core/ai/mcp/src/service/personal_access_tokens.rs | 17 +++ core/ai/mcp/src/service/segments.rs | 17 +++ core/ai/mcp/src/service/streams.rs | 40 ++++++ core/ai/mcp/src/service/system.rs | 17 +++ core/ai/mcp/src/service/topics.rs | 17 +++ core/ai/mcp/src/service/users.rs | 17 +++ core/ai/mcp/src/stream.rs | 75 +++++++++++ 22 files changed, 816 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index a89b2ecf..376eadfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3808,6 +3808,24 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "iggy-mcp" +version = "0.1.0" +dependencies = [ + "axum 0.8.4", + "config", + "dotenvy", + "figlet-rs", + "iggy", + "rmcp", + "serde", + "strum", + "thiserror 2.0.12", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "iggy_binary_protocol" version = "0.7.0" @@ -6297,6 +6315,50 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "rmcp" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37f2048a81a7ff7e8ef6bc5abced70c3d9114c8f03d85d7aaaafd9fd04f12e9e" +dependencies = [ + "axum 0.8.4", + "base64 0.22.1", + "bytes", + "chrono", + "futures", + "http 1.3.1", + "http-body", + "http-body-util", + "paste", + "pin-project-lite", + "rand 0.9.1", + "rmcp-macros", + "schemars 0.8.22", + "serde", + "serde_json", + "sse-stream", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tokio-util", + "tower-service", + "tracing", + "uuid", +] + +[[package]] +name = "rmcp-macros" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72398e694b9f6dbb5de960cf158c8699e6a1854cb5bbaac7de0646b2005763c4" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.104", +] + [[package]] name = "ron" version = "0.8.1" @@ -6582,6 +6644,19 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "schemars" +version = "0.8.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615" +dependencies = [ + "chrono", + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + [[package]] name = "schemars" version = "0.9.0" @@ -6594,6 +6669,18 @@ dependencies = [ "serde_json", ] +[[package]] +name = "schemars_derive" +version = "0.8.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e265784ad618884abaea0600a9adf15393368d840e0222d101a072f3f7534d" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.104", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -6713,6 +6800,17 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "serde_json" version = "1.0.140" @@ -6767,7 +6865,7 @@ dependencies = [ "hex", "indexmap 1.9.3", "indexmap 2.9.0", - "schemars", + "schemars 0.9.0", "serde", "serde_derive", "serde_json", @@ -7066,6 +7164,19 @@ dependencies = [ "lock_api", ] +[[package]] +name = "sse-stream" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb4dc4d33c68ec1f27d386b5610a351922656e1fdf5c05bbaad930cd1519479a" +dependencies = [ + "bytes", + "futures-util", + "http-body", + "http-body-util", + "pin-project-lite", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index f910068b..4c63503f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ codegen-units = 1 [workspace] members = [ "bdd/rust", + "core/ai/mcp", "core/bench", "core/bench/dashboard/frontend", "core/bench/dashboard/server", diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 63ebe127..18bb178b 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -335,6 +335,7 @@ iggy: 0.7.0, "Apache-2.0", iggy-bench: 0.2.4, "Apache-2.0", iggy-cli: 0.9.0, "Apache-2.0", iggy-connectors: 0.1.0, "Apache-2.0", +iggy-mcp: 0.1.0, "N/A", iggy_binary_protocol: 0.7.0, "Apache-2.0", iggy_common: 0.7.0, "Apache-2.0", iggy_connector_quickwit_sink: 0.1.0, "Apache-2.0", @@ -570,6 +571,8 @@ retry-policies: 0.4.0, "Apache-2.0 OR MIT", ring: 0.17.14, "Apache-2.0 AND ISC", rkyv: 0.7.45, "MIT", rkyv_derive: 0.7.45, "MIT", +rmcp: 0.2.1, "Apache-2.0 OR MIT", +rmcp-macros: 0.2.1, "Apache-2.0 OR MIT", ron: 0.8.1, "Apache-2.0 OR MIT", route-recognizer: 0.3.1, "MIT", rust-ini: 0.21.1, "MIT", @@ -596,7 +599,9 @@ safe_arch: 0.7.4, "Apache-2.0 OR MIT OR Zlib", same-file: 1.0.6, "MIT OR Unlicense", scc: 2.3.4, "Apache-2.0", schannel: 0.1.27, "MIT", +schemars: 0.8.22, "MIT", schemars: 0.9.0, "MIT", +schemars_derive: 0.8.22, "MIT", scoped-tls: 1.0.1, "Apache-2.0 OR MIT", scopeguard: 1.2.0, "Apache-2.0 OR MIT", sdd: 3.0.8, "Apache-2.0", @@ -610,6 +615,7 @@ serde: 1.0.219, "Apache-2.0 OR MIT", serde-wasm-bindgen: 0.5.0, "MIT", serde-wasm-bindgen: 0.6.5, "MIT", serde_derive: 1.0.219, "Apache-2.0 OR MIT", +serde_derive_internals: 0.29.1, "Apache-2.0 OR MIT", serde_json: 1.0.140, "Apache-2.0 OR MIT", serde_path_to_error: 0.1.17, "Apache-2.0 OR MIT", serde_spanned: 0.6.9, "Apache-2.0 OR MIT", @@ -639,6 +645,7 @@ snafu-derive: 0.8.6, "Apache-2.0 OR MIT", socket2: 0.5.10, "Apache-2.0 OR MIT", spin: 0.9.8, "MIT", spinning_top: 0.3.0, "Apache-2.0 OR MIT", +sse-stream: 0.2.1, "Apache-2.0 OR MIT", stable_deref_trait: 1.2.0, "Apache-2.0 OR MIT", static-toml: 1.3.0, "MIT", static_assertions: 1.1.0, "Apache-2.0 OR MIT", diff --git a/core/ai/README.md b/core/ai/README.md new file mode 100644 index 00000000..e8f06e6d --- /dev/null +++ b/core/ai/README.md @@ -0,0 +1,3 @@ +# Apache Iggy AI + +This module contains the AI-related components of the Apache Iggy message streaming infrastructure. diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml new file mode 100644 index 00000000..d81bf7f0 --- /dev/null +++ b/core/ai/mcp/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "iggy-mcp" +version = "0.1.0" +edition = "2024" + +[dependencies] +axum = { workspace = true } +config = { workspace = true } +dotenvy = { workspace = true } +figlet-rs = { workspace = true } +iggy = { workspace = true } +rmcp = { version = "0.2.1", features = [ + "server", + "transport-io", + "transport-sse-server", + "transport-streamable-http-server", +] } +serde = { workspace = true } +strum = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } diff --git a/core/ai/mcp/README.md b/core/ai/mcp/README.md new file mode 100644 index 00000000..c8af1619 --- /dev/null +++ b/core/ai/mcp/README.md @@ -0,0 +1,3 @@ +# Apache Iggy MCP Server + +The Model Context Protocol (MCP) is an open protocol that standardizes how applications provide context to LLMs. The Apache Iggy MCP Server is an implementation of the MCP protocol for the message streaming infrastructure. diff --git a/core/ai/mcp/config.toml b/core/ai/mcp/config.toml new file mode 100644 index 00000000..ab06c0ca --- /dev/null +++ b/core/ai/mcp/config.toml @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +transport = "stdio" + +[http_api] # Optional HTTP API configuration +address = "127.0.0.1:8082" +# api_key = "secret" # Optional API key for authentication to be passed as `api-key` header + +[http_api.tls] # Optional TLS configuration for HTTP API +enabled = false +cert_file = "core/certs/iggy_cert.pem" +key_file = "core/certs/iggy_key.pem" + +[iggy] +address = "localhost:8090" +username = "iggy" +password = "iggy" +# token = "secret" # Personal Access Token (PAT) can be used instead of username and password diff --git a/core/ai/mcp/src/configs.rs b/core/ai/mcp/src/configs.rs new file mode 100644 index 00000000..89908a4e --- /dev/null +++ b/core/ai/mcp/src/configs.rs @@ -0,0 +1,60 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use serde::{Deserialize, Serialize}; +use strum::Display; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct McpServerConfig { + pub http_api: Option<HttpApiConfig>, + pub iggy: IggyConfig, + pub transport: McpTransport, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IggyConfig { + pub address: String, + pub username: Option<String>, + pub password: Option<String>, + pub token: Option<String>, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct HttpApiConfig { + pub address: String, + pub api_key: Option<String>, + pub tls: Option<HttpTlsConfig>, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct HttpTlsConfig { + pub enabled: bool, + pub cert_file: String, + pub key_file: String, +} + +#[derive(Clone, Copy, Debug, Default, Display, PartialEq, Eq, Serialize, Deserialize)] +#[strum(serialize_all = "snake_case")] +#[serde(rename_all = "snake_case")] +pub enum McpTransport { + #[default] + #[strum(to_string = "http")] + Http, + #[strum(to_string = "stdio")] + Stdio, +} diff --git a/core/ai/mcp/src/error.rs b/core/ai/mcp/src/error.rs new file mode 100644 index 00000000..b722e575 --- /dev/null +++ b/core/ai/mcp/src/error.rs @@ -0,0 +1,35 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum McpRuntimeError { + #[error("Failed to create service")] + FailedToCreateService, + #[error("Missing configuration")] + MissingConfig, + #[error("Failed to start HTTP server")] + FailedToStartHttpServer, + #[error("Iggy client error")] + IggyClient(#[from] iggy::prelude::ClientError), + #[error("Iggy error")] + IggyError(#[from] iggy::prelude::IggyError), + #[error("Missing Iggy credentials")] + MissingIggyCredentials, +} diff --git a/core/ai/mcp/src/main.rs b/core/ai/mcp/src/main.rs new file mode 100644 index 00000000..59f2ff8c --- /dev/null +++ b/core/ai/mcp/src/main.rs @@ -0,0 +1,152 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use config::{Config, Environment, File}; +use configs::{McpServerConfig, McpTransport}; +use dotenvy::dotenv; +use error::McpRuntimeError; +use figlet_rs::FIGfont; +use rmcp::{ + ServiceExt, + transport::{ + StreamableHttpService, stdio, streamable_http_server::session::local::LocalSessionManager, + }, +}; +use service::IggyService; +use std::{env, sync::Arc}; +use tracing::{error, info}; +use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; + +mod configs; +mod error; +mod service; +mod stream; + +#[tokio::main] +async fn main() -> Result<(), McpRuntimeError> { + let standard_font = FIGfont::standard().unwrap(); + let figure = standard_font.convert("Iggy MCP Server"); + eprintln!("{}", figure.unwrap()); + + if let Ok(env_path) = std::env::var("IGGY_MCP_ENV_PATH") { + if dotenvy::from_path(&env_path).is_ok() { + eprintln!("Loaded environment variables from path: {env_path}"); + } + } else if let Ok(path) = dotenv() { + eprintln!( + "Loaded environment variables from .env file at path: {}", + path.display() + ); + } + + let config_path = env::var("IGGY_MCP_CONFIG_PATH").unwrap_or_else(|_| "config".to_string()); + + eprintln!("Loading configuration from: {config_path}"); + + let builder = Config::builder() + .add_source(File::with_name(&config_path)) + .add_source(Environment::with_prefix("IGGY_MCP").separator("_")); + + let config: McpServerConfig = builder + .build() + .expect("Failed to build runtime config") + .try_deserialize() + .expect("Failed to deserialize runtime config"); + + let transport = config.transport; + if transport == McpTransport::Stdio { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("DEBUG"))) + .with_writer(std::io::stderr) + .with_ansi(false) + .init(); + } else { + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); + } + + info!("Starting Iggy MCP Server, transport: {transport}..."); + + let iggy_clients = stream::init(config.iggy).await?; + let consumer = Arc::new(iggy_clients.consumer); + let producer = Arc::new(iggy_clients.producer); + + if transport == McpTransport::Stdio { + let Ok(service) = IggyService::new(consumer, producer) + .serve(stdio()) + .await + .inspect_err(|e| { + error!("Serving error: {:?}", e); + }) + else { + error!("Failed to create service"); + return Err(McpRuntimeError::FailedToCreateService); + }; + + if let Err(error) = service.waiting().await { + error!("waiting error: {:?}", error); + } + } else { + let Some(http_config) = config.http_api else { + error!("HTTP API configuration not found"); + return Err(McpRuntimeError::MissingConfig); + }; + + let service = StreamableHttpService::new( + move || Ok(IggyService::new(consumer.clone(), producer.clone())), + LocalSessionManager::default().into(), + Default::default(), + ); + + let router = axum::Router::new().nest_service("/mcp", service); + let tcp_listener = tokio::net::TcpListener::bind(&http_config.address) + .await + .map_err(|error| { + error!("Failed to bind TCP listener: {:?}", error); + McpRuntimeError::FailedToStartHttpServer + })?; + info!("HTTP API listening on: {}", http_config.address); + let _ = axum::serve(tcp_listener, router) + .with_graceful_shutdown(async { tokio::signal::ctrl_c().await.unwrap() }) + .await; + } + + #[cfg(unix)] + let (mut ctrl_c, mut sigterm) = { + use tokio::signal::unix::{SignalKind, signal}; + ( + signal(SignalKind::interrupt()).expect("Failed to create SIGINT signal"), + signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal"), + ) + }; + + #[cfg(unix)] + tokio::select! { + _ = ctrl_c.recv() => { + info!("Received SIGINT. Shutting down Iggy MCP Server..."); + }, + _ = sigterm.recv() => { + info!("Received SIGTERM. Shutting down Iggy MCP Server..."); + } + } + + info!("Iggy MCP Server stopped successfully"); + Ok(()) +} diff --git a/core/ai/mcp/src/service/consumer_groups.rs b/core/ai/mcp/src/service/consumer_groups.rs new file mode 100644 index 00000000..31bd66e6 --- /dev/null +++ b/core/ai/mcp/src/service/consumer_groups.rs @@ -0,0 +1,17 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ diff --git a/core/ai/mcp/src/service/consumer_offsets.rs b/core/ai/mcp/src/service/consumer_offsets.rs new file mode 100644 index 00000000..31bd66e6 --- /dev/null +++ b/core/ai/mcp/src/service/consumer_offsets.rs @@ -0,0 +1,17 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ diff --git a/core/ai/mcp/src/service/messages.rs b/core/ai/mcp/src/service/messages.rs new file mode 100644 index 00000000..31bd66e6 --- /dev/null +++ b/core/ai/mcp/src/service/messages.rs @@ -0,0 +1,17 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ diff --git a/core/ai/mcp/src/service/mod.rs b/core/ai/mcp/src/service/mod.rs new file mode 100644 index 00000000..f135738d --- /dev/null +++ b/core/ai/mcp/src/service/mod.rs @@ -0,0 +1,119 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::sync::Arc; + +use iggy::prelude::{Identifier, IggyClient, IggyError}; +use rmcp::{ + ServerHandler, + handler::server::{router::tool::ToolRouter, tool::Parameters}, + model::{CallToolResult, Content, ErrorData, ServerCapabilities, ServerInfo}, + tool, tool_handler, tool_router, +}; +use serde::Serialize; +use streams::GetStream; +use tracing::error; + +mod consumer_groups; +mod consumer_offsets; +mod messages; +mod partitions; +mod personal_access_tokens; +mod segments; +mod streams; +mod system; +mod topics; +mod users; + +#[derive(Debug, Clone)] +pub struct IggyService { + tool_router: ToolRouter<Self>, + service: InternalService, +} + +#[tool_router] +impl IggyService { + pub fn new(consumer: Arc<IggyClient>, producer: Arc<IggyClient>) -> Self { + Self { + tool_router: Self::tool_router(), + service: InternalService { + consumer, + _producer: producer, + }, + } + } + + #[tool(description = "Get streams")] + pub async fn get_streams(&self) -> Result<CallToolResult, ErrorData> { + self.service.get_streams().await + } + + #[tool(description = "Get stream")] + pub async fn get_stream( + &self, + Parameters(GetStream { stream_id }): Parameters<GetStream>, + ) -> Result<CallToolResult, ErrorData> { + self.service.get_stream(id(&stream_id)?).await + } +} + +#[tool_handler] +impl ServerHandler for IggyService { + fn get_info(&self) -> ServerInfo { + ServerInfo { + instructions: Some("Iggy service".into()), + capabilities: ServerCapabilities::builder().enable_tools().build(), + ..Default::default() + } + } +} + +fn id(id: &str) -> Result<Identifier, ErrorData> { + Identifier::from_str_value(id).map_err(|e| { + let message = format!("Failed to parse identifier. {e}"); + error!(message); + ErrorData::invalid_request(message, None) + }) +} + +#[derive(Debug, Clone)] +struct InternalService { + consumer: Arc<IggyClient>, + _producer: Arc<IggyClient>, +} + +impl InternalService { + fn request( + &self, + result: Result<impl Sized + Serialize, IggyError>, + ) -> Result<CallToolResult, ErrorData> { + let result = result.map_err(|e| { + let message = format!("There was an error when invoking the method. {e}"); + error!(message); + ErrorData::invalid_request(message, None) + })?; + + let content = Content::json(result).map_err(|error| { + let message = format!("Failed to serialize result. {error}"); + error!(message); + ErrorData::internal_error(message, None) + })?; + + Ok(CallToolResult::success(vec![content])) + } +} diff --git a/core/ai/mcp/src/service/partitions.rs b/core/ai/mcp/src/service/partitions.rs new file mode 100644 index 00000000..31bd66e6 --- /dev/null +++ b/core/ai/mcp/src/service/partitions.rs @@ -0,0 +1,17 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ diff --git a/core/ai/mcp/src/service/personal_access_tokens.rs b/core/ai/mcp/src/service/personal_access_tokens.rs new file mode 100644 index 00000000..31bd66e6 --- /dev/null +++ b/core/ai/mcp/src/service/personal_access_tokens.rs @@ -0,0 +1,17 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ diff --git a/core/ai/mcp/src/service/segments.rs b/core/ai/mcp/src/service/segments.rs new file mode 100644 index 00000000..31bd66e6 --- /dev/null +++ b/core/ai/mcp/src/service/segments.rs @@ -0,0 +1,17 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ diff --git a/core/ai/mcp/src/service/streams.rs b/core/ai/mcp/src/service/streams.rs new file mode 100644 index 00000000..554c9194 --- /dev/null +++ b/core/ai/mcp/src/service/streams.rs @@ -0,0 +1,40 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::InternalService; +use iggy::prelude::{Identifier, StreamClient}; +use rmcp::{ + model::{CallToolResult, ErrorData}, + schemars, +}; + +impl InternalService { + pub async fn get_streams(&self) -> Result<CallToolResult, ErrorData> { + self.request(self.consumer.get_streams().await) + } + + pub async fn get_stream(&self, stream_id: Identifier) -> Result<CallToolResult, ErrorData> { + self.request(self.consumer.get_stream(&stream_id).await) + } +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +pub struct GetStream { + #[schemars(description = "stream identifier (numeric or string)")] + pub stream_id: String, +} diff --git a/core/ai/mcp/src/service/system.rs b/core/ai/mcp/src/service/system.rs new file mode 100644 index 00000000..31bd66e6 --- /dev/null +++ b/core/ai/mcp/src/service/system.rs @@ -0,0 +1,17 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ diff --git a/core/ai/mcp/src/service/topics.rs b/core/ai/mcp/src/service/topics.rs new file mode 100644 index 00000000..31bd66e6 --- /dev/null +++ b/core/ai/mcp/src/service/topics.rs @@ -0,0 +1,17 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ diff --git a/core/ai/mcp/src/service/users.rs b/core/ai/mcp/src/service/users.rs new file mode 100644 index 00000000..31bd66e6 --- /dev/null +++ b/core/ai/mcp/src/service/users.rs @@ -0,0 +1,17 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ diff --git a/core/ai/mcp/src/stream.rs b/core/ai/mcp/src/stream.rs new file mode 100644 index 00000000..969f3391 --- /dev/null +++ b/core/ai/mcp/src/stream.rs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iggy::prelude::{Client, IggyClient, IggyClientBuilder}; +use tracing::{error, info}; + +use crate::{configs::IggyConfig, error::McpRuntimeError}; + +pub struct IggyClients { + pub producer: IggyClient, + pub consumer: IggyClient, +} + +pub async fn init(config: IggyConfig) -> Result<IggyClients, McpRuntimeError> { + let consumer = create_client(&config).await?; + let producer = create_client(&config).await?; + let iggy_clients = IggyClients { producer, consumer }; + Ok(iggy_clients) +} + +async fn create_client(config: &IggyConfig) -> Result<IggyClient, McpRuntimeError> { + let address = config.address.to_owned(); + let username = config.username.to_owned(); + let password = config.password.to_owned(); + let token = config.token.to_owned(); + + let connection_string = if let Some(token) = token { + if token.is_empty() { + error!("Iggy token cannot be empty (if username and password are not provided)"); + return Err(McpRuntimeError::MissingIggyCredentials); + } + + let redacted_token = token.chars().take(3).collect::<String>(); + info!("Using token: {redacted_token}*** for Iggy authentication"); + format!("iggy://{token}@{address}") + } else { + info!("Using username and password for Iggy authentication"); + let username = username.ok_or(McpRuntimeError::MissingIggyCredentials)?; + if username.is_empty() { + error!("Iggy password cannot be empty (if token is not provided)"); + return Err(McpRuntimeError::MissingIggyCredentials); + } + + let password = password.ok_or(McpRuntimeError::MissingIggyCredentials)?; + if password.is_empty() { + error!("Iggy password cannot be empty (if token is not provided)"); + return Err(McpRuntimeError::MissingIggyCredentials); + } + + let redacted_username = username.chars().take(3).collect::<String>(); + let redacted_password = password.chars().take(3).collect::<String>(); + info!( + "Using username: {redacted_username}***, password: {redacted_password}*** for Iggy authentication" + ); + format!("iggy://{username}:{password}@{address}") + }; + + let client = IggyClientBuilder::from_connection_string(&connection_string)?.build()?; + client.connect().await?; + Ok(client) +}
