This is an automated email from the ASF dual-hosted git repository.

gongchao pushed a commit to branch feat-streamable-http
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git

commit b31896bdf0848c34172802b619c2eb0a18bd76d3
Author: tomsun28 <[email protected]>
AuthorDate: Sat Dec 6 23:22:38 2025 +0800

    feat: support mcp streamable http protocol and upgrade spring ai version
    
    Signed-off-by: tomsun28 <[email protected]>
---
 hertzbeat-ai/pom.xml                               |   2 +-
 .../ai/config/CustomSseServerTransport.java        | 246 ---------------------
 .../hertzbeat/ai/service/McpServerService.java     |  23 --
 .../impl/ChatClientProviderServiceImpl.java        |   4 +-
 .../ai/service/impl/McpServerServiceImpl.java      |  38 ----
 .../src/main/resources/application.yml             |  15 +-
 script/helm/hertzbeat-helm-chart                   |   2 +-
 7 files changed, 7 insertions(+), 323 deletions(-)

diff --git a/hertzbeat-ai/pom.xml b/hertzbeat-ai/pom.xml
index de91bf7fa9..d026a98b3b 100644
--- a/hertzbeat-ai/pom.xml
+++ b/hertzbeat-ai/pom.xml
@@ -26,7 +26,7 @@
        <artifactId>hertzbeat-ai</artifactId>
        <version>${hertzbeat.version}</version>
        <properties>
-               <spring-ai.version>1.0.3</spring-ai.version>
+               <spring-ai.version>1.1.1</spring-ai.version>
                <java.version>17</java.version>
        </properties>
 
diff --git 
a/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/config/CustomSseServerTransport.java
 
b/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/config/CustomSseServerTransport.java
deleted file mode 100644
index 825985e86a..0000000000
--- 
a/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/config/CustomSseServerTransport.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hertzbeat.ai.config;
-
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.usthe.sureness.mgt.SurenessSecurityManager;
-import com.usthe.sureness.subject.SubjectSum;
-import io.modelcontextprotocol.spec.McpError;
-import io.modelcontextprotocol.spec.McpSchema;
-import io.modelcontextprotocol.spec.McpServerSession;
-import io.modelcontextprotocol.spec.McpServerTransport;
-import io.modelcontextprotocol.spec.McpServerTransportProvider;
-import io.modelcontextprotocol.util.Assert;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import jakarta.servlet.http.HttpServletRequest;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.servlet.function.RouterFunction;
-import org.springframework.web.servlet.function.RouterFunctions;
-import org.springframework.web.servlet.function.ServerRequest;
-import org.springframework.web.servlet.function.ServerResponse;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-/**
- * Custom Server-Sent Events transport provider for Model Context Protocol.
- */
-@Slf4j
-public class CustomSseServerTransport implements McpServerTransportProvider {
-    private final ObjectMapper objectMapper;
-    private final String messageEndpoint;
-    private final String sseEndpoint;
-    private final String baseUrl;
-    @Getter
-    private final RouterFunction<ServerResponse> routerFunction;
-    @Setter
-    private McpServerSession.Factory sessionFactory;
-    private final Map<String, Object> sessionRequest = new HashMap<>();
-    private final ConcurrentHashMap<String, McpServerSession> sessions;
-    private volatile boolean isClosing;
-
-    public CustomSseServerTransport(ObjectMapper objectMapper, String 
messageEndpoint) {
-        this(objectMapper, messageEndpoint, "/sse");
-    }
-
-
-    public CustomSseServerTransport(ObjectMapper objectMapper, String 
messageEndpoint, String sseEndpoint) {
-        this(objectMapper, "", messageEndpoint, sseEndpoint);
-    }
-
-    public CustomSseServerTransport(ObjectMapper objectMapper, String baseUrl, 
String messageEndpoint, String sseEndpoint) {
-        this.sessions = new ConcurrentHashMap();
-        this.isClosing = false;
-        Assert.notNull(objectMapper, "ObjectMapper must not be null");
-        Assert.notNull(baseUrl, "Message base URL must not be null");
-        Assert.notNull(messageEndpoint, "Message endpoint must not be null");
-        Assert.notNull(sseEndpoint, "SSE endpoint must not be null");
-        this.objectMapper = objectMapper;
-        this.baseUrl = baseUrl;
-        this.messageEndpoint = messageEndpoint;
-        this.sseEndpoint = sseEndpoint;
-        this.routerFunction = RouterFunctions.route().GET(this.sseEndpoint, 
this::handleSseConnection).POST(this.messageEndpoint, 
this::handleMessage).build();
-    }
-
-    public Mono<Void> notifyClients(String method, Object params) {
-        if (this.sessions.isEmpty()) {
-            log.debug("No active sessions to broadcast message to");
-            return Mono.empty();
-        } else {
-            log.debug("Attempting to broadcast message to {} active sessions", 
this.sessions.size());
-            return Flux.fromIterable(this.sessions.values())
-                    .flatMap((session) -> session.sendNotification(method, 
params)
-                            .doOnError((e) -> log.error("Failed to send 
message to session {}: {}", session.getId(), e.getMessage()))
-                            .onErrorComplete())
-                    .then();
-        }
-    }
-
-    public Mono<Void> closeGracefully() {
-        return Flux.fromIterable(this.sessions.values()).doFirst(() -> {
-            this.isClosing = true;
-            log.debug("Initiating graceful shutdown with {} active sessions", 
this.sessions.size());
-        }).flatMap(McpServerSession::closeGracefully).then().doOnSuccess((v) 
-> log.debug("Graceful shutdown completed"));
-    }
-
-    private ServerResponse handleSseConnection(ServerRequest request) {
-        log.debug("Handling SSE connection for request: {}", request);
-        HttpServletRequest servletRequest = request.servletRequest();
-
-        try {
-
-            log.debug("Processing SSE connection for servlet request: {}", 
servletRequest);
-            log.debug("Authorization header: {}", 
servletRequest.getHeader("Authorization"));
-
-
-        } catch (Exception e) {
-            log.error("Authentication failed for SSE connection: {}", 
e.getMessage());
-            return 
ServerResponse.status(HttpStatus.UNAUTHORIZED).body("Unauthorized: " + 
e.getMessage());
-        }
-
-
-
-        if (this.isClosing) {
-            return 
ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).body("Server is shutting 
down");
-        } else {
-            String sessionId = UUID.randomUUID().toString();
-            log.debug("Generated session ID for SSE connection: {}", 
sessionId);
-            log.debug("Creating new SSE connection for session: {}", 
sessionId);
-
-
-            return ServerResponse.sse((sseBuilder) -> {
-                sseBuilder.onComplete(() -> {
-                    log.debug("SSE connection completed for session: {}", 
sessionId);
-                    this.sessions.remove(sessionId);
-                });
-                sseBuilder.onTimeout(() -> {
-                    log.debug("SSE connection timed out for session: {}", 
sessionId);
-                    this.sessions.remove(sessionId);
-                });
-                CustomSseServerTransport.WebMvcMcpSessionTransport 
sessionTransport = new 
CustomSseServerTransport.WebMvcMcpSessionTransport(sessionId, sseBuilder);
-                McpServerSession session = 
this.sessionFactory.create(sessionTransport);
-                this.sessionRequest.put(sessionId, request.servletRequest());
-                this.sessions.put(sessionId, session);
-
-                try {
-                    
sseBuilder.id(sessionId).event("endpoint").data(this.baseUrl + 
this.messageEndpoint + "?sessionId=" + sessionId);
-                } catch (Exception e) {
-                    log.error("Failed to send initial endpoint event: {}", 
e.getMessage());
-                    sseBuilder.error(e);
-                }
-
-            }, Duration.ZERO);
-
-        }
-    }
-
-    private ServerResponse handleMessage(ServerRequest request) {
-        if (this.isClosing) {
-            return 
ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).body("Server is shutting 
down");
-        } else if (request.param("sessionId").isEmpty()) {
-            return ServerResponse.badRequest().body(new McpError("Session ID 
missing in message endpoint"));
-        } else {
-            String sessionId = (String) request.param("sessionId").get();
-            McpServerSession session = (McpServerSession) 
this.sessions.get(sessionId);
-            log.debug("Authorization header for message request: {}", 
request.servletRequest().getHeader("Authorization"));
-            SubjectSum subject = 
SurenessSecurityManager.getInstance().checkIn(sessionRequest.get(sessionId));
-            McpContextHolder.setSubject(subject);
-
-
-            if (session == null) {
-                return ServerResponse.status(HttpStatus.NOT_FOUND).body(new 
McpError("Session not found: " + sessionId));
-            } else {
-                try {
-                    String body = request.body(String.class);
-                    McpSchema.JSONRPCMessage message = 
McpSchema.deserializeJsonRpcMessage(this.objectMapper, body);
-                    session.handle(message).block();
-                    return ServerResponse.ok().build();
-                } catch (IOException | IllegalArgumentException e) {
-                    log.error("Failed to deserialize message: {}", 
((Exception) e).getMessage());
-                    return ServerResponse.badRequest().body(new 
McpError("Invalid message format"));
-                } catch (Exception e) {
-                    log.error("Error handling message: {}", e.getMessage());
-                    return 
ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).body(new 
McpError(e.getMessage()));
-                }
-            }
-        }
-    }
-
-    private class WebMvcMcpSessionTransport implements McpServerTransport {
-        private final String sessionId;
-        private final ServerResponse.SseBuilder sseBuilder;
-
-        WebMvcMcpSessionTransport(String sessionId, ServerResponse.SseBuilder 
sseBuilder) {
-            this.sessionId = sessionId;
-            this.sseBuilder = sseBuilder;
-            log.debug("Session transport {} initialized with SSE builder", 
sessionId);
-        }
-
-        public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
-            return Mono.fromRunnable(() -> {
-                try {
-                    String jsonText = 
CustomSseServerTransport.this.objectMapper.writeValueAsString(message);
-                    
this.sseBuilder.id(this.sessionId).event("message").data(jsonText);
-                    log.debug("Message sent to session {}", this.sessionId);
-                } catch (Exception e) {
-                    log.error("Failed to send message to session {}: {}", 
this.sessionId, e.getMessage());
-                    this.sseBuilder.error(e);
-                }
-
-            });
-        }
-
-        public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
-            return (T) 
CustomSseServerTransport.this.objectMapper.convertValue(data, typeRef);
-        }
-
-        public Mono<Void> closeGracefully() {
-            return Mono.fromRunnable(() -> {
-                log.debug("Closing session transport: {}", this.sessionId);
-
-                try {
-                    this.sseBuilder.complete();
-                    log.debug("Successfully completed SSE builder for session 
{}", this.sessionId);
-                } catch (Exception e) {
-                    log.warn("Failed to complete SSE builder for session {}: 
{}", this.sessionId, e.getMessage());
-                }
-
-            });
-        }
-
-        public void close() {
-            try {
-                this.sseBuilder.complete();
-                log.debug("Successfully completed SSE builder for session {}", 
this.sessionId);
-            } catch (Exception e) {
-                log.warn("Failed to complete SSE builder for session {}: {}", 
this.sessionId, e.getMessage());
-            }
-
-        }
-    }
-}
diff --git 
a/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/service/McpServerService.java
 
b/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/service/McpServerService.java
index 6b518413d2..695a26ae03 100644
--- 
a/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/service/McpServerService.java
+++ 
b/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/service/McpServerService.java
@@ -18,12 +18,7 @@
 
 package org.apache.hertzbeat.ai.service;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.hertzbeat.ai.config.CustomSseServerTransport;
-import org.springframework.ai.mcp.server.autoconfigure.McpServerProperties;
 import org.springframework.ai.tool.ToolCallbackProvider;
-import org.springframework.web.servlet.function.RouterFunction;
-import org.springframework.web.servlet.function.ServerResponse;
 
 /**
  * Service interface for MCP server operations.
@@ -35,22 +30,4 @@ public interface McpServerService {
      * @return ToolCallbackProvider with all HertzBeat monitoring tools
      */
     ToolCallbackProvider hertzbeatTools();
-
-    /**
-     * Provides a custom SSE server transport for the MCP server
-     * @param objectMapper the ObjectMapper instance for JSON serialization
-     * @param serverProperties the properties for the MCP server configuration
-     * @return a CustomSseServerTransport instance configured with the 
provided properties
-     */
-    CustomSseServerTransport webMvcSseServerTransportProvider(
-            ObjectMapper objectMapper,
-            McpServerProperties serverProperties
-    );
-
-    /**
-     * Provides the MCP server router function for web MVC
-     * @param transport Custom SSE server transport
-     * @return RouterFunction for handling MCP server requests
-     */
-    RouterFunction<ServerResponse> 
mvcMcpRouterFunction(CustomSseServerTransport transport);
 }
diff --git 
a/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/service/impl/ChatClientProviderServiceImpl.java
 
b/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/service/impl/ChatClientProviderServiceImpl.java
index 38e49c3a35..293edf11a2 100644
--- 
a/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/service/impl/ChatClientProviderServiceImpl.java
+++ 
b/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/service/impl/ChatClientProviderServiceImpl.java
@@ -34,7 +34,6 @@ import org.springframework.ai.chat.messages.Message;
 import org.springframework.ai.chat.messages.UserMessage;
 import org.springframework.ai.tool.ToolCallbackProvider;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.ApplicationContext;
 import reactor.core.publisher.Flux;
 
@@ -53,8 +52,7 @@ public class ChatClientProviderServiceImpl implements 
ChatClientProviderService
     private final ApplicationContext applicationContext;
 
     private final GeneralConfigDao generalConfigDao;
-
-    @Qualifier("hertzbeatTools")
+    
     @Autowired
     private ToolCallbackProvider toolCallbackProvider;
     
diff --git 
a/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/service/impl/McpServerServiceImpl.java
 
b/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/service/impl/McpServerServiceImpl.java
index 6af80b1b12..cdb2418397 100644
--- 
a/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/service/impl/McpServerServiceImpl.java
+++ 
b/hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/service/impl/McpServerServiceImpl.java
@@ -18,23 +18,17 @@
 
 package org.apache.hertzbeat.ai.service.impl;
 
-import org.apache.hertzbeat.ai.config.CustomSseServerTransport;
 import org.apache.hertzbeat.ai.service.McpServerService;
 import org.apache.hertzbeat.ai.tools.AlertDefineTools;
 import org.apache.hertzbeat.ai.tools.AlertTools;
 import org.apache.hertzbeat.ai.tools.MetricsTools;
 import org.apache.hertzbeat.ai.tools.MonitorTools;
-import org.springframework.ai.mcp.server.autoconfigure.McpServerProperties;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Primary;
 import org.springframework.stereotype.Service;
 import org.springframework.ai.tool.ToolCallbackProvider;
 import org.springframework.ai.tool.method.MethodToolCallbackProvider;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.springframework.web.servlet.function.RouterFunction;
-import org.springframework.web.servlet.function.ServerResponse;
 
 /**
  * Implementation of the McpServerService interface.
@@ -56,36 +50,4 @@ public class McpServerServiceImpl implements 
McpServerService {
     public ToolCallbackProvider hertzbeatTools() {
         return MethodToolCallbackProvider.builder().toolObjects(monitorTools, 
alertTools, alertDefineTools, metricsTools).build();
     }
-    /**
-     * Provides a custom SSE server transport for the MCP server.
-     *
-     * @param objectMapper the ObjectMapper instance for JSON serialization
-     * @param serverProperties the properties for the MCP server configuration
-     * @return a CustomSseServerTransport instance configured with the 
provided properties
-     */
-
-    @Bean
-    public CustomSseServerTransport webMvcSseServerTransportProvider(
-            ObjectMapper objectMapper,
-            McpServerProperties serverProperties
-    ) {
-        return new CustomSseServerTransport(
-                objectMapper,
-                serverProperties.getBaseUrl(),
-                serverProperties.getSseMessageEndpoint(),
-                serverProperties.getSseEndpoint()
-        );
-    }
-    /**
-     * Provides the MCP server transport bean.
-     *
-     * @param transport the custom SSE server transport
-     * @return the MCP server transport instance
-     */
-
-    @Primary
-    @Bean
-    public RouterFunction<ServerResponse> 
mvcMcpRouterFunction(CustomSseServerTransport transport) {
-        return transport.getRouterFunction();
-    }
 }
diff --git a/hertzbeat-startup/src/main/resources/application.yml 
b/hertzbeat-startup/src/main/resources/application.yml
index ea661edf93..c5d00079ed 100644
--- a/hertzbeat-startup/src/main/resources/application.yml
+++ b/hertzbeat-startup/src/main/resources/application.yml
@@ -24,19 +24,12 @@ spring:
       server:
         enabled: true
         stdio: false
-        name: sse-mcp-server
+        protocol: streamable
+        streamable-http:
+          mcp-endpoint: /api/mcp
+        name: hertzbeat-mcp-server
         version: 1.0.0
-        resource-change-notification: true
-        tool-change-notification: true
-        prompt-change-notification: true
-        sse-endpoint: /api/sse
-        sse-message-endpoint: /api/mcp/message
         type: SYNC
-        capabilities:
-          tool: true
-          resource: true
-          prompt: true
-          completion: true
 
   mvc:
     static-path-pattern: /**
diff --git a/script/helm/hertzbeat-helm-chart b/script/helm/hertzbeat-helm-chart
index 2906774fe8..4a4fbb796e 160000
--- a/script/helm/hertzbeat-helm-chart
+++ b/script/helm/hertzbeat-helm-chart
@@ -1 +1 @@
-Subproject commit 2906774fe83b0f1475f8c467fee9f9c331cb36a9
+Subproject commit 4a4fbb796e9485862c10e38b28a43e604d02284f


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to