This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-a2a.git


The following commit(s) were added to refs/heads/main by this push:
     new b21f1cb  Optimize the RocketMQTransport (#14)
b21f1cb is described below

commit b21f1cb43bf6578c8350e901464fd9f4f7716305
Author: Drizzle <[email protected]>
AuthorDate: Fri Dec 26 17:17:06 2025 +0800

    Optimize the RocketMQTransport (#14)
    
    * update
    
    Change-Id: Icb48afcf44893ad3c07fc0e243e333096d8e3ec0
    
    * optimize the code
    
    Change-Id: Iaa158a2bd2bc55475e7c55e7ae3392b4e2f28fa0
    
    * optimize the code
    
    Change-Id: I9fba77a503e387b5d9feab13cd144021b938ec1f
    
    * update
    
    Change-Id: Ia842ff8df57e018a36c672c9ddc7d1c7f42240f3
    
    * update
    
    Change-Id: Ibfe44c7be6f7a25be6ba74644ac0e9bee9a8ce43
    
    * update
    
    Change-Id: I6ba98dca2dc8001494aec4ddbe4e8bea38f8250a
    
    * update
    
    Change-Id: Ib0f380ba1f073608842c4ad467757368e331bdeb
    
    * update
    
    Change-Id: I3034540314cc832d68a1095fdb5a1366f2b26864
    
    * update
    
    Change-Id: Ie22cf3da240e10ae70800f8c1443134c70c9215f
    
    * optimize the code
    
    Change-Id: I0bc42ef1f8813d4ed538810c27c8e468cda90309
    
    * update
    
    Change-Id: I31f0bfda2a217979be592c533065268f03418951
    
    * update
    
    Change-Id: If3147fde462620d5fa71a71c86a574babcc23589
    
    * update
    
    Change-Id: I5b711a35249a0525a749af7e92bbe4547f9eb910
    
    * update
    
    Change-Id: I6cd9b87e7aeeb85137ba324f79f17b386cf2bef8
    
    * update
    
    Change-Id: I5134715702d491ce02a900e9ca36bc214c20141e
    
    * update
    
    Change-Id: I8031f2fbe0dbd69ec40f23b7a7d6dab3ef53e047
    
    * update
    
    Change-Id: Ic418be82f1e52191d5c2afd7307c64f9962fe943
    
    ---------
    
    Co-authored-by: drizzle.zk <[email protected]>
---
 .../SupervisorAgent-Web/pom.xml                    |   2 +-
 example/rocketmq-multiagent-base-adk/pom.xml       |   2 +-
 pom.xml                                            |   8 +-
 .../rocketmq/a2a/common/RocketMQA2AConstant.java   |  16 +
 .../rocketmq/a2a/common/RocketMQResourceInfo.java  | 113 ++++
 .../apache/rocketmq/a2a/common/RocketMQUtil.java   | 352 +++++++++++++
 .../a2a/server/RocketMQA2AServerRoutes.java        | 285 ++++------
 .../rocketmq/a2a/transport/RocketMQTransport.java  | 577 ++-------------------
 8 files changed, 658 insertions(+), 697 deletions(-)

diff --git a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml 
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
index 374381c..2869d06 100644
--- a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
+++ b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
@@ -119,7 +119,7 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-a2a</artifactId>
-            <version>1.0.7</version>
+            <version>1.0.8</version>
         </dependency>
     </dependencies>
 
diff --git a/example/rocketmq-multiagent-base-adk/pom.xml 
b/example/rocketmq-multiagent-base-adk/pom.xml
index a92d43f..850a237 100644
--- a/example/rocketmq-multiagent-base-adk/pom.xml
+++ b/example/rocketmq-multiagent-base-adk/pom.xml
@@ -81,7 +81,7 @@
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-a2a</artifactId>
-                <version>1.0.7</version>
+                <version>1.0.8</version>
             </dependency>
             <dependency>
                 <groupId>io.quarkus</groupId>
diff --git a/pom.xml b/pom.xml
index 37b4301..0fea3a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
 
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-a2a</artifactId>
-    <version>1.0.8-SNAPSHOT</version>
+    <version>1.0.8</version>
 
     <name>Apache RocketMQ A2A ${project.version}</name>
     <description>Integrate Apache RocketMQ with A2A</description>
@@ -141,6 +141,12 @@
             <artifactId>slf4j-api</artifactId>
             <version>${slfj-version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-a2a</artifactId>
+            <version>3.0.1-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/src/main/java/org/apache/rocketmq/a2a/common/RocketMQA2AConstant.java 
b/src/main/java/org/apache/rocketmq/a2a/common/RocketMQA2AConstant.java
index fd428f0..a55c453 100644
--- a/src/main/java/org/apache/rocketmq/a2a/common/RocketMQA2AConstant.java
+++ b/src/main/java/org/apache/rocketmq/a2a/common/RocketMQA2AConstant.java
@@ -16,7 +16,23 @@
  */
 package org.apache.rocketmq.a2a.common;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.a2a.spec.CancelTaskResponse;
+import io.a2a.spec.GetAuthenticatedExtendedCardResponse;
+import io.a2a.spec.GetTaskPushNotificationConfigResponse;
+import io.a2a.spec.GetTaskResponse;
+import io.a2a.spec.ListTaskPushNotificationConfigResponse;
+import io.a2a.spec.SendMessageResponse;
+import io.a2a.spec.SetTaskPushNotificationConfigResponse;
+
 public class RocketMQA2AConstant {
+    public static final TypeReference<SendMessageResponse> 
SEND_MESSAGE_RESPONSE_REFERENCE = new TypeReference<>() { };
+    public static final TypeReference<GetTaskResponse> 
GET_TASK_RESPONSE_REFERENCE = new TypeReference<>() { };
+    public static final TypeReference<CancelTaskResponse> 
CANCEL_TASK_RESPONSE_REFERENCE = new TypeReference<>() { };
+    public static final TypeReference<GetTaskPushNotificationConfigResponse> 
GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() { 
};
+    public static final TypeReference<SetTaskPushNotificationConfigResponse> 
SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() { 
};
+    public static final TypeReference<ListTaskPushNotificationConfigResponse> 
LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() { 
};
+    public static final TypeReference<GetAuthenticatedExtendedCardResponse> 
GET_AUTHENTICATED_EXTENDED_CARD_RESPONSE_REFERENCE = new TypeReference<>() { };
     public static final String HTTP_URL_PREFIX = "http://";;
     public static final String HTTPS_URL_PREFIX = "https://";;
     public static final String ROCKETMQ_PROTOCOL = "RocketMQ";
diff --git 
a/src/main/java/org/apache/rocketmq/a2a/common/RocketMQResourceInfo.java 
b/src/main/java/org/apache/rocketmq/a2a/common/RocketMQResourceInfo.java
new file mode 100644
index 0000000..0ad46cb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/a2a/common/RocketMQResourceInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.a2a.common;
+
+import java.util.List;
+
+import com.alibaba.fastjson.JSON;
+import io.a2a.spec.AgentCard;
+import io.a2a.spec.AgentInterface;
+import org.apache.rocketmq.shaded.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static 
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.HTTPS_URL_PREFIX;
+import static 
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.HTTP_URL_PREFIX;
+
+public class RocketMQResourceInfo {
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQResourceInfo.class);
+    private String namespace;
+    private String endpoint;
+    private String topic;
+
+    public RocketMQResourceInfo(String endpoint, String topic) {
+        this.endpoint = endpoint;
+        this.topic = topic;
+    }
+
+    public RocketMQResourceInfo() {}
+
+    public String getEndpoint() {
+        return endpoint;
+    }
+
+    public void setEndpoint(String endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public static RocketMQResourceInfo parseAgentCardAddition(AgentCard 
agentCard) {
+        if (null == agentCard || 
StringUtils.isEmpty(agentCard.preferredTransport()) || 
StringUtils.isEmpty(agentCard.url()) || null == 
agentCard.additionalInterfaces() || agentCard.additionalInterfaces().isEmpty()) 
{
+            log.error("parseAgentCardAddition param error, agentCard: {}", 
JSON.toJSONString(agentCard));
+            return null;
+        }
+        RocketMQResourceInfo rocketMQResourceInfo = null;
+        String preferredTransport = agentCard.preferredTransport();
+        if (RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(preferredTransport)) {
+            String url = agentCard.url();
+            rocketMQResourceInfo = pareAgentCardUrl(url);
+            if (null != rocketMQResourceInfo && 
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) && 
!StringUtils.isEmpty(rocketMQResourceInfo.getTopic())) {
+                log.info("RocketMQTransport get rocketMQResourceInfo from 
preferredTransport");
+                return rocketMQResourceInfo;
+            }
+        }
+        List<AgentInterface> agentInterfaces = 
agentCard.additionalInterfaces();
+        for (AgentInterface agentInterface : agentInterfaces) {
+            String transport = agentInterface.transport();
+            if (!StringUtils.isEmpty(transport) && 
RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(transport)) {
+                String url = agentInterface.url();
+                rocketMQResourceInfo = pareAgentCardUrl(url);
+                if (null != rocketMQResourceInfo && 
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) && 
!StringUtils.isEmpty(rocketMQResourceInfo.getTopic())) {
+                    log.error("RocketMQTransport get rocketMQResourceInfo from 
additionalInterfaces");
+                    return rocketMQResourceInfo;
+                }
+            }
+        }
+        return null;
+    }
+
+    public static RocketMQResourceInfo pareAgentCardUrl(String agentCardUrl) {
+        if (StringUtils.isEmpty(agentCardUrl)) {
+            return null;
+        }
+        String agentUrl = agentCardUrl.replace(HTTP_URL_PREFIX, "");
+        String replaceFinal = agentUrl.replace(HTTPS_URL_PREFIX, "");
+        String[] split = replaceFinal.split("/");
+        if (split.length != 3) {
+            return null;
+        }
+        RocketMQResourceInfo rocketMQResourceInfo = new RocketMQResourceInfo();
+        rocketMQResourceInfo.setEndpoint(split[0].trim());
+        rocketMQResourceInfo.setNamespace(split[1].trim());
+        rocketMQResourceInfo.setTopic(split[2].trim());
+        return rocketMQResourceInfo;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/a2a/common/RocketMQUtil.java 
b/src/main/java/org/apache/rocketmq/a2a/common/RocketMQUtil.java
new file mode 100644
index 0000000..3d469dc
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/a2a/common/RocketMQUtil.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.a2a.common;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import com.alibaba.fastjson.JSON;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.a2a.client.transport.jsonrpc.sse.SSEEventListener;
+import io.a2a.client.transport.spi.interceptors.PayloadAndHeaders;
+import io.a2a.spec.A2AClientException;
+import io.a2a.spec.JSONRPCError;
+import io.a2a.spec.JSONRPCResponse;
+import io.a2a.util.Utils;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.shaded.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static io.a2a.util.Utils.OBJECT_MAPPER;
+import static org.apache.rocketmq.a2a.common.RocketMQA2AConstant.DATA_PREFIX;
+
+public class RocketMQUtil {
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQUtil.class);
+    public static final ConcurrentMap<String /* namespace */, Map<String /* 
WorkerAgentResponseTopic */, LitePushConsumer>> ROCKETMQ_CONSUMER_MAP = new 
ConcurrentHashMap<>();
+    public static final ConcurrentMap<String /* namespace */, Map<String /* 
agentTopic */, Producer>> ROCKETMQ_PRODUCER_MAP = new ConcurrentHashMap<>();
+    public static final ConcurrentMap<String /* namespace */, Map<String /* 
msgId */, CompletableFuture<String>>> MESSAGE_RESPONSE_MAP = new 
ConcurrentHashMap<>();
+    public static final ConcurrentMap<String /* namespace */, Map<String /* 
msgId */, SSEEventListener>> MESSAGE_STREAM_RESPONSE_MAP = new 
ConcurrentHashMap<>();
+    public static final ConcurrentMap<String /* namespace */, Map<String /* 
liteTopic */, Boolean>> LITE_TOPIC_USE_DEFAULT_RECOVER_MAP = new 
ConcurrentHashMap<>();
+    public static final ConcurrentMap<String /* namespace */, Map<String /* 
Key */, SSEEventListener>> RECOVER_MESSAGE_STREAM_RESPONSE_MAP = new 
ConcurrentHashMap<>();
+
+    public static void checkConfigParam(String endpoint, String 
workAgentResponseTopic, String workAgentResponseGroupID, String liteTopic, 
String agentTopic) {
+        if (StringUtils.isEmpty(endpoint) || 
StringUtils.isEmpty(workAgentResponseTopic) || 
StringUtils.isEmpty(workAgentResponseGroupID) || StringUtils.isEmpty(liteTopic) 
|| StringUtils.isEmpty(agentTopic)) {
+            if (StringUtils.isEmpty(endpoint)) {
+                log.error("checkRocketMQConfigParam endpoint is empty");
+            }
+            if (StringUtils.isEmpty(workAgentResponseTopic)) {
+                log.error("checkRocketMQConfigParam workAgentResponseTopic is 
empty");
+            }
+            if (StringUtils.isEmpty(workAgentResponseGroupID)) {
+                log.error("checkRocketMQConfigParam workAgentResponseGroupID 
is empty");
+            }
+            if (StringUtils.isEmpty(liteTopic)) {
+                log.error("checkRocketMQConfigParam liteTopic is empty");
+            }
+            if (StringUtils.isEmpty(agentTopic)) {
+                log.error("checkRocketMQConfigParam agentTopic is empty");
+            }
+            throw new RuntimeException("checkRocketMQConfigParam error, init 
failed !!!");
+        }
+    }
+
+    public static Producer initAndGetProducer(String namespace, String 
endpoint, String accessKey, String secretKey, String agentTopic) throws 
ClientException {
+        if (null == namespace || StringUtils.isEmpty(endpoint) || 
StringUtils.isEmpty(agentTopic)) {
+            log.error("initAndGetProducer param error, namespace: {}, 
endpoint: {}, agentTopic: {}", namespace, endpoint, agentTopic);
+        }
+        Map<String, Producer> producerMap = 
ROCKETMQ_PRODUCER_MAP.computeIfAbsent(namespace, k -> new HashMap<>());
+        return producerMap.computeIfAbsent(agentTopic, k -> {
+            try {
+                return buildProducer(namespace, endpoint, accessKey, 
secretKey, k);
+            } catch (ClientException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    public static Producer buildProducer(String namespace, String endpoint, 
String accessKey, String secretKey, String... topics) throws ClientException {
+        if (null == namespace || StringUtils.isEmpty(endpoint)) {
+            log.error("buildProducer param error, endpoint: {}", endpoint);
+            return null;
+        }
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        SessionCredentialsProvider sessionCredentialsProvider = new 
StaticSessionCredentialsProvider(accessKey, secretKey);
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+            .setEndpoints(endpoint)
+            .setNamespace(namespace)
+            .setCredentialProvider(sessionCredentialsProvider)
+            .setRequestTimeout(Duration.ofSeconds(15))
+            .build();
+        final ProducerBuilder builder = provider.newProducerBuilder()
+            .setClientConfiguration(clientConfiguration)
+            .setTopics(topics);
+        return builder.build();
+    }
+
+    public static LitePushConsumer initAndGetConsumer(String namespace, String 
endpoint, String accessKey, String secretKey, String workAgentResponseTopic, 
String workAgentResponseGroupID, String liteTopic) throws ClientException {
+        if (null == namespace || StringUtils.isEmpty(endpoint) || 
StringUtils.isEmpty(workAgentResponseTopic) || 
StringUtils.isEmpty(workAgentResponseGroupID) || 
StringUtils.isEmpty(liteTopic)) {
+            log.error("initAndGetConsumer param error, namespace: {}, 
endpoint: {}, workAgentResponseTopic: {}, " + "workAgentResponseGroupID: {}, 
liteTopic: {}", namespace, endpoint, workAgentResponseTopic, 
workAgentResponseGroupID, liteTopic);
+            return null;
+        }
+        Map<String, LitePushConsumer> consumerMap = 
ROCKETMQ_CONSUMER_MAP.computeIfAbsent(namespace, k -> new HashMap<>());
+        LitePushConsumer litePushConsumer = 
consumerMap.computeIfAbsent(workAgentResponseTopic, k -> {
+            try {
+                return buildConsumer(endpoint, namespace, accessKey, 
secretKey, workAgentResponseGroupID, workAgentResponseTopic);
+            } catch (ClientException e) {
+                log.error("RocketMQTransport initRocketMQProducerAndConsumer 
buildConsumer error: {}", e.getMessage());
+                throw new RuntimeException(e);
+            }
+        });
+        if (null != litePushConsumer) {
+            litePushConsumer.subscribeLite(liteTopic);
+        }
+        return litePushConsumer;
+    }
+
+    //todo
+    public static LitePushConsumer buildConsumer(String endpoint, String 
namespace, String accessKey, String secretKey, String workAgentResponseGroupID, 
String workAgentResponseTopic) throws ClientException {
+        if (StringUtils.isEmpty(endpoint) || 
StringUtils.isEmpty(workAgentResponseGroupID) || 
StringUtils.isEmpty(workAgentResponseTopic)) {
+            log.error("RocketMQTransport buildConsumer check param error");
+            return null;
+        }
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        SessionCredentialsProvider sessionCredentialsProvider = new 
StaticSessionCredentialsProvider(accessKey, secretKey);
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+            .setEndpoints(endpoint)
+            .setNamespace(namespace)
+            .setCredentialProvider(sessionCredentialsProvider)
+            .build();
+        return provider.newLitePushConsumerBuilder()
+            .setClientConfiguration(clientConfiguration)
+            .setConsumerGroup(workAgentResponseGroupID)
+            .bindTopic(workAgentResponseTopic)
+            .setMessageListener(messageView -> {
+                try {
+                    Optional<String> liteTopicOpt = messageView.getLiteTopic();
+                    String liteTopic = liteTopicOpt.get();
+                    if (StringUtils.isEmpty(liteTopic)) {
+                        log.error("RocketMQTransport buildConsumer liteTopic 
is empty");
+                        return ConsumeResult.SUCCESS;
+                    }
+                    byte[] result = new 
byte[messageView.getBody().remaining()];
+                    messageView.getBody().get(result);
+                    String resultStr = new String(result, 
StandardCharsets.UTF_8);
+                    RocketMQResponse response = JSON.parseObject(resultStr, 
RocketMQResponse.class);
+                    if (null == response || 
StringUtils.isEmpty(response.getMessageId())) {
+                        log.error("RocketMQTransport litePushConsumer consumer 
error, response is null or messageId is empty");
+                        return ConsumeResult.SUCCESS;
+                    }
+                    if (!response.isStream()) {
+                        return dealNonStreamResult(response, namespace);
+                    }
+                    return dealStreamResult(response, namespace, liteTopic);
+                } catch (Exception e) {
+                    log.error("RocketMQTransport litePushConsumer consumer 
error, msgId: {}, error: {}", messageView.getMessageId(), e.getMessage());
+                    return ConsumeResult.SUCCESS;
+                }
+            }).build();
+    }
+
+    public static PushConsumer buildConsumer(String endpoint, String 
namespace, String accessKey, String secretKey, String bizGroup, String 
bizTopic, MessageListener messageListener) throws ClientException {
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        SessionCredentialsProvider sessionCredentialsProvider = new 
StaticSessionCredentialsProvider(accessKey, secretKey);
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+            .setEndpoints(endpoint)
+            .setNamespace(namespace)
+            .setCredentialProvider(sessionCredentialsProvider)
+            .build();
+        String tag = "*";
+        return provider.newPushConsumerBuilder()
+            .setClientConfiguration(clientConfiguration)
+            .setConsumerGroup(bizGroup)
+            .setSubscriptionExpressions(Collections.singletonMap(bizTopic, new 
FilterExpression(tag, FilterExpressionType.TAG)))
+            .setMessageListener(messageListener).build();
+    }
+
+    public static Message buildMessage(String topic, String liteTopic, 
RocketMQResponse response) {
+        if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic)) {
+            log.error("RocketMQA2AServerRoutes buildMessage param error, 
topic: {}, liteTopic: {}, response: {}", topic, liteTopic, 
JSON.toJSONString(response));
+            return null;
+        }
+        String missionJsonStr = JSON.toJSONString(response);
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        return provider.newMessageBuilder()
+            .setTopic(topic)
+            .setBody(missionJsonStr.getBytes(StandardCharsets.UTF_8))
+            .setLiteTopic(liteTopic)
+            .build();
+    }
+
+    public static String sendRocketMQRequest(PayloadAndHeaders 
payloadAndHeaders, String agentTopic, String liteTopic, String 
workAgentResponseTopic, Producer producer) throws JsonProcessingException {
+        if (null == payloadAndHeaders || StringUtils.isEmpty(agentTopic) || 
StringUtils.isEmpty(liteTopic) || StringUtils.isEmpty(workAgentResponseTopic) 
|| null == producer) {
+            log.error("RocketMQTransport sendRocketMQRequest param error, 
payloadAndHeaders: {}, agentTopic: {}, workAgentResponseTopic: {}, liteTopic: 
{}, producer: {}", payloadAndHeaders, agentTopic, workAgentResponseTopic, 
liteTopic, producer);
+            return null;
+        }
+        RocketMQRequest request = new RocketMQRequest();
+        
request.setRequestBody(Utils.OBJECT_MAPPER.writeValueAsString(payloadAndHeaders.getPayload()));
+        request.setAgentTopic(agentTopic);
+        request.setWorkAgentResponseTopic(workAgentResponseTopic);
+        request.setLiteTopic(liteTopic);
+        if (payloadAndHeaders.getHeaders() != null) {
+            for (Map.Entry<String, String> entry : 
payloadAndHeaders.getHeaders().entrySet()) {
+                request.addHeader(entry.getKey(), entry.getValue());
+            }
+        }
+        String messageBodyStr = serialText(request);
+        if (StringUtils.isEmpty(messageBodyStr)) {
+            return null;
+        }
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        byte[] body = messageBodyStr.getBytes(StandardCharsets.UTF_8);
+        final Message message = 
provider.newMessageBuilder().setTopic(agentTopic).setBody(body).build();
+        try {
+            final SendReceipt sendReceipt = producer.send(message);
+            if (!StringUtils.isEmpty(sendReceipt.getMessageId().toString())) {
+                return sendReceipt.getMessageId().toString();
+            }
+        } catch (Throwable t) {
+            log.error("sendRocketMQRequest send message failed, error: {}", 
t.getMessage());
+        }
+        return null;
+    }
+
+    private static ConsumeResult dealStreamResult(RocketMQResponse response, 
String namespace, String liteTopic) {
+        if (null == response || StringUtils.isEmpty(response.getMessageId()) 
|| StringUtils.isEmpty(liteTopic) || !response.isEnd() && 
StringUtils.isEmpty(response.getResponseBody())) {
+            log.error("RocketMQTransport dealStreamResult param is error, 
response: {}, liteTopic: {}", JSON.toJSONString(response), liteTopic);
+            return ConsumeResult.SUCCESS;
+        }
+        Map<String, SSEEventListener> sseEventListenerMap = 
MESSAGE_STREAM_RESPONSE_MAP.get(namespace);
+        if (null == sseEventListenerMap) {
+            return ConsumeResult.SUCCESS;
+        }
+        SSEEventListener sseEventListener = 
sseEventListenerMap.get(response.getMessageId());
+        if (null == sseEventListener) {
+            Map<String, Boolean> booleanMap = 
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.get(namespace);
+            if (null == booleanMap || 
!Boolean.TRUE.equals(booleanMap.get(liteTopic))) {
+                return ConsumeResult.SUCCESS;
+            }
+            if (!RECOVER_MESSAGE_STREAM_RESPONSE_MAP.isEmpty() && 
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.containsKey(namespace)) {
+                Map<String, SSEEventListener> sseEventListenerMapRecover = 
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.get(namespace);
+                if (null == sseEventListenerMapRecover) {
+                    return ConsumeResult.SUCCESS;
+                }
+                sseEventListener = 
sseEventListenerMapRecover.get(RocketMQA2AConstant.DEFAULT_STREAM_RECOVER);
+                if (null == sseEventListener) {
+                    return ConsumeResult.SUCCESS;
+                }
+            }
+            if (null == sseEventListener) {
+                return ConsumeResult.SUCCESS;
+            }
+        }
+        String item = response.getResponseBody();
+        if (!StringUtils.isEmpty(item) && item.startsWith(DATA_PREFIX)) {
+            item = item.substring(5).trim();
+            if (!item.isEmpty()) {
+                try {
+                    sseEventListener.onMessage(item, new 
CompletableFuture<>());
+                } catch (Throwable e) {
+                    log.error("RocketMQTransport dealStreamResult error: {}", 
e.getMessage());
+                    return ConsumeResult.FAILURE;
+                }
+            }
+            if (response.isEnd() && 
!StringUtils.isEmpty(response.getMessageId())) {
+                sseEventListenerMap.remove(response.getMessageId());
+            }
+        }
+        return ConsumeResult.SUCCESS;
+    }
+
+    private static ConsumeResult dealNonStreamResult(RocketMQResponse 
response, String namespace) {
+        if (null == response || StringUtils.isEmpty(response.getMessageId()) 
|| StringUtils.isEmpty(response.getResponseBody())) {
+            log.error("RocketMQTransport dealNonStreamResult param is error, 
response: {}", JSON.toJSONString(response));
+            return ConsumeResult.SUCCESS;
+        }
+        Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.get(namespace);
+        if (null != completableFutureMap && 
completableFutureMap.containsKey(response.getMessageId())) {
+            CompletableFuture<String> completableFuture = 
completableFutureMap.get(response.getMessageId());
+            completableFuture.complete(response.getResponseBody());
+        }
+        return ConsumeResult.SUCCESS;
+    }
+
+    public static String getResult(String responseMessageId, String namespace) 
throws ExecutionException, InterruptedException, TimeoutException {
+        if (StringUtils.isEmpty(responseMessageId)) {
+            throw new RuntimeException("responseMessageId is null");
+        }
+        Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(namespace, k -> new HashMap<>());
+        CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
+        completableFutureMap.put(responseMessageId, objectCompletableFuture);
+        String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
+        completableFutureMap.remove(responseMessageId);
+        return result;
+    }
+
+    public static <T extends JSONRPCResponse<?>> T unmarshalResponse(String 
response, TypeReference<T> typeReference)
+        throws A2AClientException, JsonProcessingException {
+        T value = Utils.unmarshalFrom(response, typeReference);
+        JSONRPCError error = value.getError();
+        if (error != null) {
+            throw new A2AClientException(error.getMessage() + (error.getData() 
!= null ? ": " + error.getData() : ""), error);
+        }
+        return value;
+    }
+
+    public static String toJsonString(Object o) {
+        if (null == o) {
+            log.error("toJsonString param is null");
+            return null;
+        }
+        try {
+            return OBJECT_MAPPER.writeValueAsString(o);
+        } catch (JsonProcessingException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    public static String serialText(RocketMQRequest rocketMQRequest) {
+        if (null == rocketMQRequest || 
StringUtils.isEmpty(rocketMQRequest.getRequestBody()) || 
StringUtils.isEmpty(rocketMQRequest.getWorkAgentResponseTopic()) || 
StringUtils.isEmpty(rocketMQRequest.getLiteTopic()) || 
StringUtils.isEmpty(rocketMQRequest.getAgentTopic())) {
+            log.error("serialText param error rocketMQRequest: {}", 
JSON.toJSONString(rocketMQRequest));
+            return null;
+        }
+        return JSON.toJSONString(rocketMQRequest);
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java 
b/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
index 321bc68..976261c 100644
--- a/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
+++ b/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
@@ -17,8 +17,6 @@
 package org.apache.rocketmq.a2a.server;
 
 import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.Collections;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow;
@@ -27,7 +25,9 @@ import 
java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
+
 import com.alibaba.fastjson.JSON;
+
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.io.JsonEOFException;
@@ -70,24 +70,22 @@ import io.vertx.ext.web.RoutingContext;
 import jakarta.annotation.PostConstruct;
 import jakarta.inject.Inject;
 import jakarta.inject.Singleton;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
-import org.apache.rocketmq.client.apis.consumer.FilterExpression;
-import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
 import org.apache.rocketmq.client.apis.consumer.PushConsumer;
-import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Producer;
-import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.shaded.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import static io.a2a.util.Utils.OBJECT_MAPPER;
 import static org.apache.rocketmq.a2a.common.RocketMQA2AConstant.METHOD;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.buildConsumer;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.buildMessage;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.buildProducer;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.toJsonString;
 
 @Startup
 @Singleton
@@ -99,159 +97,115 @@ public class RocketMQA2AServerRoutes extends 
A2AServerRoutes {
     private static final String BIZ_CONSUMER_GROUP = 
System.getProperty("bizConsumerGroup", "");
     private static final String ACCESS_KEY = System.getProperty("rocketMQAK", 
"");
     private static final String SECRET_KEY = System.getProperty("rocketMQSK", 
"");
-
-    @Inject
-    JSONRPCHandler jsonRpcHandler;
-
     private static volatile Runnable 
streamingMultiSseSupportSubscribedRunnable;
-
-    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
-        6,
-        6,
-        60, TimeUnit.SECONDS,
-        new ArrayBlockingQueue<>(10_0000),
-        new CallerRunsPolicy()
-    );
-
+    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(6, 6, 
60, TimeUnit.SECONDS,
+        new ArrayBlockingQueue<>(10_0000), new CallerRunsPolicy());
     private Producer producer;
     private PushConsumer pushConsumer;
     private MultiSseSupport multiSseSupport;
 
+    @Inject
+    JSONRPCHandler jsonRpcHandler;
+
     @PostConstruct
     public void init() {
         try {
             checkConfigParam();
-            this.producer = buildProducer();
-            this.pushConsumer = buildConsumer();
+            this.producer = buildProducer(ROCKETMQ_NAMESPACE, 
ROCKETMQ_ENDPOINT, ACCESS_KEY, SECRET_KEY);
+            this.pushConsumer = buildConsumer(ROCKETMQ_ENDPOINT, 
ROCKETMQ_NAMESPACE, ACCESS_KEY, SECRET_KEY,
+                BIZ_CONSUMER_GROUP, BIZ_TOPIC, buildMessageListener());
             this.multiSseSupport = new MultiSseSupport(this.producer);
             log.info("RocketMQA2AServerRoutes init success");
         } catch (Exception e) {
             log.error("RocketMQA2AServerRoutes error: {}", e.getMessage());
         }
     }
-    private Producer buildProducer() throws ClientException {
-        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
-        SessionCredentialsProvider sessionCredentialsProvider = new 
StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
-        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-            .setEndpoints(ROCKETMQ_ENDPOINT)
-            .setNamespace(ROCKETMQ_NAMESPACE)
-            .setCredentialProvider(sessionCredentialsProvider)
-            .setRequestTimeout(Duration.ofSeconds(15))
-            .build();
-        final ProducerBuilder builder = 
provider.newProducerBuilder().setClientConfiguration(clientConfiguration);
-        return builder.build();
-    }
 
-    private PushConsumer buildConsumer() throws ClientException {
-        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
-        SessionCredentialsProvider sessionCredentialsProvider = new 
StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
-        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-            .setEndpoints(ROCKETMQ_ENDPOINT)
-            .setNamespace(ROCKETMQ_NAMESPACE)
-            .setCredentialProvider(sessionCredentialsProvider)
-            .build();
-        String tag = "*";
-        FilterExpression filterExpression = new FilterExpression(tag, 
FilterExpressionType.TAG);
-        PushConsumer consumer = provider.newPushConsumerBuilder()
-            .setClientConfiguration(clientConfiguration)
-            .setConsumerGroup(BIZ_CONSUMER_GROUP)
-            .setSubscriptionExpressions(Collections.singletonMap(BIZ_TOPIC, 
filterExpression))
-            .setMessageListener(messageView -> {
-                CompletableFuture<Boolean> completableFuture = null;
+    private MessageListener buildMessageListener() {
+        return messageView -> {
+            CompletableFuture<Boolean> completableFuture = null;
+            try {
+                byte[] result = new byte[messageView.getBody().remaining()];
+                messageView.getBody().get(result);
+                String messageStr = new String(result, StandardCharsets.UTF_8);
+                RocketMQRequest request = JSON.parseObject(messageStr, 
RocketMQRequest.class);
+                boolean streaming = false;
+                String body = request.getRequestBody();
+                JSONRPCResponse<?> nonStreamingResponse = null;
+                Multi<? extends JSONRPCResponse<?>> streamingResponse = null;
+                JSONRPCErrorResponse error = null;
                 try {
-                    byte[] result = new 
byte[messageView.getBody().remaining()];
-                    messageView.getBody().get(result);
-                    String messageStr = new String(result, 
StandardCharsets.UTF_8);
-                    RocketMQRequest request = JSON.parseObject(messageStr, 
RocketMQRequest.class);
-                    boolean streaming = false;
-                    String body = request.getRequestBody();
-                    JSONRPCResponse<?> nonStreamingResponse = null;
-                    Multi<? extends JSONRPCResponse<?>> streamingResponse = 
null;
-                    JSONRPCErrorResponse error = null;
-                    try {
-                        JsonNode node = OBJECT_MAPPER.readTree(body);
-                        JsonNode method = node != null ? node.get(METHOD) : 
null;
-                        streaming = method != null && 
(SendStreamingMessageRequest.METHOD.equals(method.asText()) || 
TaskResubscriptionRequest.METHOD.equals(method.asText()));
-                        if (streaming) {
-                            StreamingJSONRPCRequest<?> streamingJSONRPCRequest 
= OBJECT_MAPPER.treeToValue(node, StreamingJSONRPCRequest.class);
-                            streamingResponse = 
processStreamingRequest(streamingJSONRPCRequest, null);
-                        } else {
-                            NonStreamingJSONRPCRequest<?> 
nonStreamingJSONRPCRequest = OBJECT_MAPPER.treeToValue(node, 
NonStreamingJSONRPCRequest.class);
-                            nonStreamingResponse = 
processNonStreamingRequest(nonStreamingJSONRPCRequest, null);
-                        }
-                    } catch (JsonProcessingException e) {
-                        error = handleError(e);
-                    } catch (Throwable t) {
-                        error = new JSONRPCErrorResponse(new 
InternalError(t.getMessage()));
-                    } finally {
-                        RocketMQResponse response = null;
-                        if (error != null) {
-                            response = new RocketMQResponse();
-                            response.setEnd(true);
-                            response.setStream(false);
-                            response.setLiteTopic(request.getLiteTopic());
-                            response.setContextId(response.getContextId());
-                            response.setResponseBody(JSON.toJSONString(error));
-                            
response.setMessageId(messageView.getMessageId().toString());
-                        } else if (streaming) {
-                            final Multi<? extends JSONRPCResponse<?>> 
finalStreamingResponse = streamingResponse;
-                            log.info("RocketMQA2AServerRoutes streaming 
finalStreamingResponse: {}", JSON.toJSONString(finalStreamingResponse));
-                            completableFuture = new CompletableFuture<>();
-                            CompletableFuture<Boolean> finalCompletableFuture 
= completableFuture;
-                            this.executor.execute(() -> {
-                                
this.multiSseSupport.subscribeObjectRocketmq(finalStreamingResponse.map(i -> 
(Object)i), null, request.getWorkAgentResponseTopic(), request.getLiteTopic(), 
messageView.getMessageId().toString(), finalCompletableFuture);
-                            });
-                        } else {
-                            response = new RocketMQResponse();
-                            response.setEnd(true);
-                            response.setStream(false);
-                            response.setLiteTopic(request.getLiteTopic());
-                            response.setContextId(response.getContextId());
-                            
response.setMessageId(messageView.getMessageId().toString());
-                            
response.setResponseBody(toJsonString(nonStreamingResponse));
-                        }
-                        if (null != response) {
-                            SendReceipt send = 
this.producer.send(buildMessage(request.getWorkAgentResponseTopic(), 
request.getLiteTopic(), response));
-                            log.info("RocketMQA2AServerRoutes send 
nonStreamingResponse success, msgId: {}, time: {}, response: {}", 
send.getMessageId(), System.currentTimeMillis(), JSON.toJSONString(response));
-                        }
+                    JsonNode node = OBJECT_MAPPER.readTree(body);
+                    JsonNode method = node != null ? node.get(METHOD) : null;
+                    streaming = method != null && 
(SendStreamingMessageRequest.METHOD.equals(method.asText())
+                        || 
TaskResubscriptionRequest.METHOD.equals(method.asText()));
+                    if (streaming) {
+                        StreamingJSONRPCRequest<?> streamingJSONRPCRequest = 
OBJECT_MAPPER.treeToValue(node,
+                            StreamingJSONRPCRequest.class);
+                        streamingResponse = 
processStreamingRequest(streamingJSONRPCRequest, null);
+                    } else {
+                        NonStreamingJSONRPCRequest<?> 
nonStreamingJSONRPCRequest = OBJECT_MAPPER.treeToValue(node,
+                            NonStreamingJSONRPCRequest.class);
+                        nonStreamingResponse = 
processNonStreamingRequest(nonStreamingJSONRPCRequest, null);
+                    }
+                } catch (JsonProcessingException e) {
+                    error = handleError(e);
+                } catch (Throwable t) {
+                    error = new JSONRPCErrorResponse(new 
InternalError(t.getMessage()));
+                } finally {
+                    RocketMQResponse response = null;
+                    if (error != null) {
+                        response = new RocketMQResponse();
+                        response.setEnd(true);
+                        response.setStream(false);
+                        response.setLiteTopic(request.getLiteTopic());
+                        response.setResponseBody(JSON.toJSONString(error));
+                        
response.setMessageId(messageView.getMessageId().toString());
+                    } else if (streaming) {
+                        final Multi<? extends JSONRPCResponse<?>> 
finalStreamingResponse = streamingResponse;
+                        log.info("RocketMQA2AServerRoutes streaming 
finalStreamingResponse: {}",
+                            JSON.toJSONString(finalStreamingResponse));
+                        completableFuture = new CompletableFuture<>();
+                        CompletableFuture<Boolean> finalCompletableFuture = 
completableFuture;
+                        this.executor.execute(() -> {
+                            
this.multiSseSupport.subscribeObjectRocketmq(finalStreamingResponse.map(i -> 
(Object)i),
+                                null, request.getWorkAgentResponseTopic(), 
request.getLiteTopic(),
+                                messageView.getMessageId().toString(), 
finalCompletableFuture);
+                        });
+                    } else {
+                        response = new RocketMQResponse();
+                        response.setEnd(true);
+                        response.setStream(false);
+                        response.setLiteTopic(request.getLiteTopic());
+                        
response.setMessageId(messageView.getMessageId().toString());
+                        
response.setResponseBody(toJsonString(nonStreamingResponse));
+                    }
+                    if (null != response) {
+                        SendReceipt send = 
this.producer.send(buildMessage(request.getWorkAgentResponseTopic(), 
request.getLiteTopic(), response));
+                        log.info("RocketMQA2AServerRoutes send 
nonStreamingResponse success, msgId: {}, time: {}, " + "response: {}", 
send.getMessageId(), System.currentTimeMillis(), JSON.toJSONString(response));
                     }
-                } catch (Exception e) {
-                    log.error("RocketMQA2AServerRoutes error: {}", 
e.getMessage());
-                    return ConsumeResult.FAILURE;
                 }
-                if (null != completableFuture) {
-                    try {
-                        Boolean streamResult = completableFuture.get(15, 
TimeUnit.MINUTES);
-                        if (null != streamResult && streamResult) {
-                            log.info("RocketMQA2AServerRoutes deal msg 
success");
-                            return ConsumeResult.SUCCESS;
-                        } else {
-                            log.info("RocketMQA2AServerRoutes deal msg 
failed");
-                            return ConsumeResult.FAILURE;
-                        }
-                    } catch (Exception e) {
-                        log.error("RocketMQA2AServerRoutes error: {}", 
e.getMessage());
+            } catch (Exception e) {
+                log.error("RocketMQA2AServerRoutes error: {}", e.getMessage());
+                return ConsumeResult.FAILURE;
+            }
+            if (null != completableFuture) {
+                try {
+                    Boolean streamResult = completableFuture.get(15, 
TimeUnit.MINUTES);
+                    if (null != streamResult && streamResult) {
+                        log.info("RocketMQA2AServerRoutes deal msg success");
+                        return ConsumeResult.SUCCESS;
+                    } else {
+                        log.info("RocketMQA2AServerRoutes deal msg failed");
                         return ConsumeResult.FAILURE;
                     }
+                } catch (Exception e) {
+                    log.error("RocketMQA2AServerRoutes error: {}", 
e.getMessage());
+                    return ConsumeResult.FAILURE;
                 }
-                return ConsumeResult.SUCCESS;
-            }).build();
-        return consumer;
-    }
-
-    private static Message buildMessage(String topic, String liteTopic, 
RocketMQResponse response) {
-        if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic)) {
-            log.error("RocketMQA2AServerRoutes buildMessage param error, 
topic: {}, liteTopic: {}, response: {}", topic, liteTopic, 
JSON.toJSONString(response));
-            return null;
-        }
-        String missionJsonStr = JSON.toJSONString(response);
-        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
-        final Message message = provider.newMessageBuilder()
-            .setTopic(topic)
-            .setBody(missionJsonStr.getBytes(StandardCharsets.UTF_8))
-            .setLiteTopic(liteTopic)
-            .build();
-        return message;
+            }
+            return ConsumeResult.SUCCESS;
+        };
     }
 
     private JSONRPCErrorResponse handleError(JsonProcessingException 
exception) {
@@ -276,8 +230,8 @@ public class RocketMQA2AServerRoutes extends 
A2AServerRoutes {
         return new JSONRPCErrorResponse(id, jsonRpcError);
     }
 
-    private JSONRPCResponse<?> processNonStreamingRequest(
-        NonStreamingJSONRPCRequest<?> request, ServerCallContext context) {
+    private JSONRPCResponse<?> 
processNonStreamingRequest(NonStreamingJSONRPCRequest<?> request,
+        ServerCallContext context) {
         if (request instanceof GetTaskRequest req) {
             return jsonRpcHandler.onGetTask(req, context);
         } else if (request instanceof CancelTaskRequest req) {
@@ -299,8 +253,8 @@ public class RocketMQA2AServerRoutes extends 
A2AServerRoutes {
         }
     }
 
-    private Multi<? extends JSONRPCResponse<?>> processStreamingRequest(
-        JSONRPCRequest<?> request, ServerCallContext context) {
+    private Multi<? extends JSONRPCResponse<?>> 
processStreamingRequest(JSONRPCRequest<?> request,
+        ServerCallContext context) {
         Flow.Publisher<? extends JSONRPCResponse<?>> publisher;
         if (request instanceof SendStreamingMessageRequest req) {
             publisher = jsonRpcHandler.onMessageSendStream(req, context);
@@ -326,9 +280,12 @@ public class RocketMQA2AServerRoutes extends 
A2AServerRoutes {
         private MultiSseSupport(Producer producer) {
             this.producer = producer;
         }
-        public void writeRocketmq(Multi<Buffer> multi, RoutingContext rc, 
String workAgentResponseTopic, String liteTopic, String msgId, 
CompletableFuture<Boolean> completableFuture) {
+
+        public void writeRocketmq(Multi<Buffer> multi, RoutingContext rc, 
String workAgentResponseTopic,
+            String liteTopic, String msgId, CompletableFuture<Boolean> 
completableFuture) {
             multi.subscribe().withSubscriber(new Flow.Subscriber<Buffer>() {
                 Flow.Subscription upstream;
+
                 @Override
                 public void onSubscribe(Flow.Subscription subscription) {
                     this.upstream = subscription;
@@ -342,15 +299,11 @@ public class RocketMQA2AServerRoutes extends 
A2AServerRoutes {
                 @Override
                 public void onNext(Buffer item) {
                     try {
-                        RocketMQResponse response = new RocketMQResponse();
-                        response.setEnd(false);
-                        response.setStream(true);
-                        response.setLiteTopic(liteTopic);
-                        response.setContextId(response.getContextId());
-                        response.setMessageId(msgId);
-                        response.setResponseBody(item.toString());
+                        RocketMQResponse response = new 
RocketMQResponse(liteTopic, null, item.toString(), msgId, true,
+                            false);
                         SendReceipt send = 
producer.send(buildMessage(workAgentResponseTopic, liteTopic, response));
-                        log.info("MultiSseSupport send response success, 
msgId: {}, time: {}, response: {}", send.getMessageId(), 
System.currentTimeMillis(), JSON.toJSONString(response));
+                        log.info("MultiSseSupport send response success, 
msgId: {}, time: {}", send.getMessageId(),
+                            System.currentTimeMillis(), 
JSON.toJSONString(response));
                     } catch (Exception e) {
                         log.error("MultiSseSupport send stream error, {}", 
e.getMessage());
                     }
@@ -365,15 +318,11 @@ public class RocketMQA2AServerRoutes extends 
A2AServerRoutes {
 
                 @Override
                 public void onComplete() {
+                    RocketMQResponse response = new 
RocketMQResponse(liteTopic, null, null, msgId, true, true);
                     try {
-                        RocketMQResponse response = new RocketMQResponse();
-                        response.setEnd(true);
-                        response.setStream(true);
-                        response.setLiteTopic(liteTopic);
-                        response.setContextId(response.getContextId());
-                        response.setMessageId(msgId);
                         SendReceipt send = 
producer.send(buildMessage(workAgentResponseTopic, liteTopic, response));
-                        log.info("MultiSseSupport send response success, 
msgId: {}, time: {}, response: {}", send.getMessageId(), 
System.currentTimeMillis(), JSON.toJSONString(response));
+                        log.info("MultiSseSupport send response success, 
msgId: {}, time: {}, response: {}",
+                            send.getMessageId(), System.currentTimeMillis(), 
JSON.toJSONString(response));
                     } catch (ClientException e) {
                         log.error("MultiSseSupport error send complete, msgId: 
{}", e.getMessage());
                     }
@@ -382,7 +331,8 @@ public class RocketMQA2AServerRoutes extends 
A2AServerRoutes {
             });
         }
 
-        public void subscribeObjectRocketmq(Multi<Object> multi, 
RoutingContext rc, String workAgentResponseTopic, String liteTopic, String 
msgId, CompletableFuture<Boolean> completableFuture) {
+        public void subscribeObjectRocketmq(Multi<Object> multi, 
RoutingContext rc, String workAgentResponseTopic,
+            String liteTopic, String msgId, CompletableFuture<Boolean> 
completableFuture) {
             AtomicLong count = new AtomicLong();
             Multi<Buffer> map = multi.map(new Function<Object, Buffer>() {
                 @Override
@@ -400,16 +350,9 @@ public class RocketMQA2AServerRoutes extends 
A2AServerRoutes {
         }
     }
 
-    private static String toJsonString(Object o) {
-        try {
-            return OBJECT_MAPPER.writeValueAsString(o);
-        } catch (JsonProcessingException ex) {
-            throw new RuntimeException(ex);
-        }
-    }
-
     private void checkConfigParam() {
-        if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) || 
StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) {
+        if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) || 
StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(
+            BIZ_CONSUMER_GROUP)) {
             if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT)) {
                 log.error("rocketMQEndpoint is empty");
             }
diff --git 
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java 
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
index f47db2e..3daa4c4 100644
--- a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
+++ b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
@@ -16,26 +16,15 @@
  */
 package org.apache.rocketmq.a2a.transport;
 
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
-import com.alibaba.fastjson.JSON;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
 import io.a2a.client.transport.spi.ClientTransport;
-import org.apache.rocketmq.a2a.common.RocketMQRequest;
-import org.apache.rocketmq.a2a.common.RocketMQResponse;
+import org.apache.rocketmq.a2a.common.RocketMQResourceInfo;
 import org.apache.rocketmq.a2a.common.RocketMQA2AConstant;
 import io.a2a.client.http.A2ACardResolver;
 import io.a2a.client.http.A2AHttpClient;
@@ -46,12 +35,10 @@ import 
io.a2a.client.transport.spi.interceptors.PayloadAndHeaders;
 import io.a2a.spec.A2AClientError;
 import io.a2a.spec.A2AClientException;
 import io.a2a.spec.AgentCard;
-import io.a2a.spec.AgentInterface;
 import io.a2a.spec.CancelTaskRequest;
 import io.a2a.spec.CancelTaskResponse;
 import io.a2a.spec.DeleteTaskPushNotificationConfigParams;
 import io.a2a.spec.DeleteTaskPushNotificationConfigRequest;
-import io.a2a.spec.DeleteTaskPushNotificationConfigResponse;
 import io.a2a.spec.EventKind;
 import io.a2a.spec.GetAuthenticatedExtendedCardRequest;
 import io.a2a.spec.GetAuthenticatedExtendedCardResponse;
@@ -60,9 +47,7 @@ import io.a2a.spec.GetTaskPushNotificationConfigRequest;
 import io.a2a.spec.GetTaskPushNotificationConfigResponse;
 import io.a2a.spec.GetTaskRequest;
 import io.a2a.spec.GetTaskResponse;
-import io.a2a.spec.JSONRPCError;
 import io.a2a.spec.JSONRPCMessage;
-import io.a2a.spec.JSONRPCResponse;
 import io.a2a.spec.ListTaskPushNotificationConfigParams;
 import io.a2a.spec.ListTaskPushNotificationConfigRequest;
 import io.a2a.spec.ListTaskPushNotificationConfigResponse;
@@ -77,43 +62,33 @@ import io.a2a.spec.Task;
 import io.a2a.spec.TaskIdParams;
 import io.a2a.spec.TaskPushNotificationConfig;
 import io.a2a.spec.TaskQueryParams;
-import io.a2a.util.Utils;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
-import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Producer;
-import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
-import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.shaded.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import static org.apache.rocketmq.a2a.common.RocketMQA2AConstant.DATA_PREFIX;
-import static 
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.HTTPS_URL_PREFIX;
-import static 
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.HTTP_URL_PREFIX;
 import static io.a2a.util.Assert.checkNotNullParam;
+import static 
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.CANCEL_TASK_RESPONSE_REFERENCE;
+import static 
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.GET_AUTHENTICATED_EXTENDED_CARD_RESPONSE_REFERENCE;
+import static 
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE;
+import static 
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.GET_TASK_RESPONSE_REFERENCE;
+import static 
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE;
+import static 
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.SEND_MESSAGE_RESPONSE_REFERENCE;
+import static 
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE;
+import static 
org.apache.rocketmq.a2a.common.RocketMQResourceInfo.parseAgentCardAddition;
+import static 
org.apache.rocketmq.a2a.common.RocketMQUtil.LITE_TOPIC_USE_DEFAULT_RECOVER_MAP;
+import static 
org.apache.rocketmq.a2a.common.RocketMQUtil.MESSAGE_STREAM_RESPONSE_MAP;
+import static 
org.apache.rocketmq.a2a.common.RocketMQUtil.RECOVER_MESSAGE_STREAM_RESPONSE_MAP;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.checkConfigParam;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.getResult;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.initAndGetConsumer;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.initAndGetProducer;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.sendRocketMQRequest;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.unmarshalResponse;
 
 public class RocketMQTransport implements ClientTransport {
     private static final Logger log = 
LoggerFactory.getLogger(RocketMQTransport.class);
-    private static final TypeReference<SendMessageResponse> 
SEND_MESSAGE_RESPONSE_REFERENCE = new TypeReference<>() { };
-    private static final TypeReference<GetTaskResponse> 
GET_TASK_RESPONSE_REFERENCE = new TypeReference<>() { };
-    private static final TypeReference<CancelTaskResponse> 
CANCEL_TASK_RESPONSE_REFERENCE = new TypeReference<>() { };
-    private static final TypeReference<GetTaskPushNotificationConfigResponse> 
GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() { 
};
-    private static final TypeReference<SetTaskPushNotificationConfigResponse> 
SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() { 
};
-    private static final TypeReference<ListTaskPushNotificationConfigResponse> 
LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() { 
};
-    private static final 
TypeReference<DeleteTaskPushNotificationConfigResponse> 
DELETE_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() 
{ };
-    private static final TypeReference<GetAuthenticatedExtendedCardResponse> 
GET_AUTHENTICATED_EXTENDED_CARD_RESPONSE_REFERENCE = new TypeReference<>() { };
-    private static final ConcurrentMap<String /* namespace */, Map<String /* 
WorkerAgentResponseTopic */, LitePushConsumer>> ROCKETMQ_CONSUMER_MAP = new 
ConcurrentHashMap<>();
-    private static final ConcurrentMap<String /* namespace */, Map<String /* 
agentTopic */, Producer>> ROCKETMQ_PRODUCER_MAP = new ConcurrentHashMap<>();
-    private static final ConcurrentMap<String /* namespace */, Map<String /* 
msgId */, CompletableFuture<String>>> MESSAGE_RESPONSE_MAP = new 
ConcurrentHashMap<>();
-    private static final ConcurrentMap<String /* namespace */, Map<String /* 
msgId */, SSEEventListener>> MESSAGE_STREAM_RESPONSE_MAP = new 
ConcurrentHashMap<>();
-    private static final ConcurrentMap<String /* namespace */, Map<String /* 
liteTopic */, Boolean>> LITE_TOPIC_USE_DEFAULT_RECOVER_MAP = new 
ConcurrentHashMap<>();
-    private static final ConcurrentMap<String /* namespace */, Map<String /* 
Key */, SSEEventListener>> RECOVER_MESSAGE_STREAM_RESPONSE_MAP = new 
ConcurrentHashMap<>();
-
     private final String agentTopic;
     private final String accessKey;
     private final String secretKey;
@@ -157,65 +132,25 @@ public class RocketMQTransport implements ClientTransport 
{
         this.agentTopic = rocketAgentCardInfo.getTopic();
         this.namespace = 
StringUtils.isEmpty(rocketAgentCardInfo.getNamespace()) ? "" : 
rocketAgentCardInfo.getNamespace();
         LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.namespace, k 
-> new HashMap<>()).put(this.liteTopic, useDefaultRecoverMode);
-        checkConfigParam();
-        initRocketMQProducerAndConsumer();
-    }
-
-    private void initRocketMQProducerAndConsumer() {
-        if (StringUtils.isEmpty(this.endpoint) || 
StringUtils.isEmpty(this.workAgentResponseTopic) || 
StringUtils.isEmpty(this.liteTopic)) {
-            throw new A2AClientException("RocketMQTransport 
initRocketMQProducerAndConsumer param error");
-        }
+        checkConfigParam(this.endpoint, this.workAgentResponseTopic, 
this.workAgentResponseGroupID, this.liteTopic, this.agentTopic);
         try {
-            Map<String, LitePushConsumer> consumerMap = 
ROCKETMQ_CONSUMER_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
-            if (consumerMap.containsKey(this.workAgentResponseTopic)) {
-                this.litePushConsumer = 
consumerMap.get(this.workAgentResponseTopic);
-                this.litePushConsumer.subscribeLite(this.liteTopic);
-            } else {
-                LitePushConsumer litePushConsumer = 
consumerMap.computeIfAbsent(this.workAgentResponseTopic, k -> {
-                    try {
-                        return buildConsumer();
-                    } catch (ClientException e) {
-                        log.error("RocketMQTransport 
initRocketMQProducerAndConsumer buildConsumer error: {}", e.getMessage());
-                        throw new RuntimeException(e);
-                    }
-                });
-                if (null != litePushConsumer) {
-                    litePushConsumer.subscribeLite(this.liteTopic);
-                    this.litePushConsumer = litePushConsumer;
-                }
-            }
-            Map<String, Producer> producerMap = 
ROCKETMQ_PRODUCER_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
-            if (!producerMap.containsKey(this.agentTopic)) {
-                this.producer = buildProducer(this.agentTopic);
-                producerMap.put(this.agentTopic, this.producer);
-            }
-            log.info("RocketMQTransport initRocketMQProducerAndConsumer 
success");
-        } catch (Exception e) {
-            log.error("RocketMQTransport initRocketMQProducerAndConsumer 
error: {}", e.getMessage());
+            this.litePushConsumer = initAndGetConsumer(this.namespace, 
this.endpoint, this.accessKey, this.secretKey, this.workAgentResponseTopic, 
this.workAgentResponseGroupID, this.liteTopic);
+            this.producer = initAndGetProducer(this.namespace, this.endpoint, 
this.accessKey, this.secretKey, this.agentTopic);
+        } catch (ClientException e) {
+            log.error("RocketMQTransport init rocketmq client error, e: {}", 
e.getMessage());
+            throw new RuntimeException("RocketMQTransport init rocketmq client 
error");
         }
     }
 
     @Override
     public EventKind sendMessage(MessageSendParams request, ClientCallContext 
context) throws A2AClientException {
-        SendMessageRequest sendMessageRequest = new 
SendMessageRequest.Builder()
-            .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
-            .method(SendMessageRequest.METHOD)
-            .params(request)
-            .build();
+        checkNotNullParam("request", request);
+        SendMessageRequest sendMessageRequest = new 
SendMessageRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(SendMessageRequest.METHOD).params(request).build();
         PayloadAndHeaders payloadAndHeaders = 
applyInterceptors(SendMessageRequest.METHOD, sendMessageRequest, 
this.agentCard, context);
         try {
             String liteTopic = dealLiteTopic(request.message().getContextId());
-            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
liteTopic);
-            if (StringUtils.isEmpty(responseMessageId)) {
-                log.error("RocketMQTransport sendMessage error, 
responseMessageId is null");
-                return null;
-            }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
-            CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
-            completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
-            String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
-            completableFutureMap.remove(responseMessageId);
-            SendMessageResponse response = 
unmarshalResponse(String.valueOf(result), SEND_MESSAGE_RESPONSE_REFERENCE);
+            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+            SendMessageResponse response = 
unmarshalResponse(getResult(responseMessageId, this.namespace), 
SEND_MESSAGE_RESPONSE_REFERENCE);
             return response.getResult();
         } catch (Exception e) {
             log.error("RocketMQTransport sendMessage error: {}", 
e.getMessage());
@@ -227,16 +162,12 @@ public class RocketMQTransport implements ClientTransport 
{
     public void sendMessageStreaming(MessageSendParams request, 
Consumer<StreamingEventKind> eventConsumer, Consumer<Throwable> errorConsumer, 
ClientCallContext context) throws A2AClientException {
         checkNotNullParam("request", request);
         checkNotNullParam("eventConsumer", eventConsumer);
-        SendStreamingMessageRequest sendStreamingMessageRequest = new 
SendStreamingMessageRequest.Builder()
-            .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
-            .method(SendStreamingMessageRequest.METHOD)
-            .params(request)
-            .build();
+        SendStreamingMessageRequest sendStreamingMessageRequest = new 
SendStreamingMessageRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(SendStreamingMessageRequest.METHOD).params(request).build();
         PayloadAndHeaders payloadAndHeaders = 
applyInterceptors(SendStreamingMessageRequest.METHOD, 
sendStreamingMessageRequest, this.agentCard, context);
         SSEEventListener sseEventListener = new 
SSEEventListener(eventConsumer, errorConsumer);
         try {
             String liteTopic = dealLiteTopic(request.message().getContextId());
-            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
liteTopic);
+            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
             if (StringUtils.isEmpty(responseMessageId)) {
                 log.error("RocketMQTransport sendMessageStreaming error, 
responseMessageId is null");
                 return;
@@ -266,7 +197,6 @@ public class RocketMQTransport implements ClientTransport {
                     log.info("litePushConsumer subscribeLite liteTopic: {}", 
liteTopic);
                     
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.namespace, k -> new 
HashMap<>()).put(liteTopic, this.useDefaultRecoverMode);
                 }
-
                 String closeLiteTopic = 
(String)request.metadata().get(RocketMQA2AConstant.CLOSE_LITE_TOPIC);
                 if (null != litePushConsumer && 
!StringUtils.isEmpty(closeLiteTopic)) {
                     litePushConsumer.unsubscribeLite(closeLiteTopic);
@@ -284,24 +214,12 @@ public class RocketMQTransport implements ClientTransport 
{
 
     @Override
     public Task getTask(TaskQueryParams request, ClientCallContext context) 
throws A2AClientException {
-        GetTaskRequest getTaskRequest = new GetTaskRequest.Builder()
-            .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
-            .method(GetTaskRequest.METHOD)
-            .params(request)
-            .build();
+        checkNotNullParam("request", request);
+        GetTaskRequest getTaskRequest = new 
GetTaskRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(GetTaskRequest.METHOD).params(request).build();
         PayloadAndHeaders payloadAndHeaders = 
applyInterceptors(GetTaskRequest.METHOD, getTaskRequest, this.agentCard, 
context);
         try {
-            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.liteTopic);
-            if (StringUtils.isEmpty(responseMessageId)) {
-                log.error("RocketMQTransport getTask error, responseMessageId 
is null");
-                return null;
-            }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
-            CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
-            completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
-            String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
-            completableFutureMap.remove(responseMessageId);
-            GetTaskResponse response = unmarshalResponse(result, 
GET_TASK_RESPONSE_REFERENCE);
+            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+            GetTaskResponse response = 
unmarshalResponse(getResult(responseMessageId, this.namespace), 
GET_TASK_RESPONSE_REFERENCE);
             return response.getResult();
         } catch (Exception e) {
             log.error("RocketMQTransport getTask error: {}", e.getMessage());
@@ -312,24 +230,11 @@ public class RocketMQTransport implements ClientTransport 
{
     @Override
     public Task cancelTask(TaskIdParams request, ClientCallContext context) 
throws A2AClientException {
         checkNotNullParam("request", request);
-        CancelTaskRequest cancelTaskRequest = new CancelTaskRequest.Builder()
-            .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
-            .method(CancelTaskRequest.METHOD)
-            .params(request)
-            .build();
+        CancelTaskRequest cancelTaskRequest = new 
CancelTaskRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(CancelTaskRequest.METHOD).params(request).build();
         PayloadAndHeaders payloadAndHeaders = 
applyInterceptors(CancelTaskRequest.METHOD, cancelTaskRequest, this.agentCard, 
context);
         try {
-            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.liteTopic);
-            if (StringUtils.isEmpty(responseMessageId)) {
-                log.error("RocketMQTransport cancelTask error, 
responseMessageId is null");
-                return null;
-            }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
-            CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
-            completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
-            String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
-            completableFutureMap.remove(responseMessageId);
-            CancelTaskResponse response = unmarshalResponse(result, 
CANCEL_TASK_RESPONSE_REFERENCE);
+            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+            CancelTaskResponse response = 
unmarshalResponse(getResult(responseMessageId, this.namespace), 
CANCEL_TASK_RESPONSE_REFERENCE);
             return response.getResult();
         } catch (Exception e) {
             log.error("RocketMQTransport cancelTask error: {}", 
e.getMessage());
@@ -340,25 +245,11 @@ public class RocketMQTransport implements ClientTransport 
{
     @Override
     public TaskPushNotificationConfig 
setTaskPushNotificationConfiguration(TaskPushNotificationConfig request, 
ClientCallContext context) throws A2AClientException {
         checkNotNullParam("request", request);
-        SetTaskPushNotificationConfigRequest setTaskPushNotificationRequest = 
new SetTaskPushNotificationConfigRequest.Builder()
-            .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
-            .method(SetTaskPushNotificationConfigRequest.METHOD)
-            .params(request)
-            .build();
-
+        SetTaskPushNotificationConfigRequest setTaskPushNotificationRequest = 
new 
SetTaskPushNotificationConfigRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(SetTaskPushNotificationConfigRequest.METHOD).params(request).build();
         PayloadAndHeaders payloadAndHeaders = 
applyInterceptors(SetTaskPushNotificationConfigRequest.METHOD, 
setTaskPushNotificationRequest, agentCard, context);
         try {
-            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.liteTopic);
-            if (StringUtils.isEmpty(responseMessageId)) {
-                log.error("RocketMQTransport 
setTaskPushNotificationConfiguration error, responseMessageId is null");
-                return null;
-            }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
-            CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
-            completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
-            String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
-            completableFutureMap.remove(responseMessageId);
-            SetTaskPushNotificationConfigResponse response = 
unmarshalResponse(result, SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
+            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+            SetTaskPushNotificationConfigResponse response = 
unmarshalResponse(getResult(responseMessageId, this.namespace), 
SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
             return response.getResult();
         } catch (Exception e) {
             log.error("RocketMQTransport setTaskPushNotificationConfiguration 
error: {}", e.getMessage());
@@ -369,26 +260,11 @@ public class RocketMQTransport implements ClientTransport 
{
     @Override
     public TaskPushNotificationConfig 
getTaskPushNotificationConfiguration(GetTaskPushNotificationConfigParams 
request, ClientCallContext context) throws A2AClientException {
         checkNotNullParam("request", request);
-        GetTaskPushNotificationConfigRequest getTaskPushNotificationRequest
-            = new GetTaskPushNotificationConfigRequest.Builder()
-            .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
-            .method(GetTaskPushNotificationConfigRequest.METHOD)
-            .params(request)
-            .build();
-
+        GetTaskPushNotificationConfigRequest getTaskPushNotificationRequest = 
new 
GetTaskPushNotificationConfigRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(GetTaskPushNotificationConfigRequest.METHOD).params(request).build();
         PayloadAndHeaders payloadAndHeaders = 
applyInterceptors(GetTaskPushNotificationConfigRequest.METHOD, 
getTaskPushNotificationRequest, this.agentCard, context);
         try {
-            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.liteTopic);
-            if (StringUtils.isEmpty(responseMessageId)) {
-                log.error("RocketMQTransport 
getTaskPushNotificationConfiguration error, responseMessageId is null");
-                return null;
-            }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
-            CompletableFuture<String> completableFuture = new 
CompletableFuture<>();
-            completableFutureMap.put(responseMessageId, completableFuture);
-            String result = completableFuture.get(120, TimeUnit.SECONDS);
-            completableFutureMap.remove(responseMessageId);
-            GetTaskPushNotificationConfigResponse response = 
unmarshalResponse(result, GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
+            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+            GetTaskPushNotificationConfigResponse response = 
unmarshalResponse(getResult(responseMessageId, this.namespace), 
GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
             return response.getResult();
         } catch (Exception e) {
             log.error("RocketMQTransport getTaskPushNotificationConfiguration 
error: {}", e.getMessage());
@@ -399,24 +275,11 @@ public class RocketMQTransport implements ClientTransport 
{
     @Override
     public List<TaskPushNotificationConfig> 
listTaskPushNotificationConfigurations(ListTaskPushNotificationConfigParams 
request, ClientCallContext context) throws A2AClientException {
         checkNotNullParam("request", request);
-        ListTaskPushNotificationConfigRequest listTaskPushNotificationRequest 
= new ListTaskPushNotificationConfigRequest.Builder()
-            .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
-            .method(ListTaskPushNotificationConfigRequest.METHOD)
-            .params(request)
-            .build();
+        ListTaskPushNotificationConfigRequest listTaskPushNotificationRequest 
= new 
ListTaskPushNotificationConfigRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(ListTaskPushNotificationConfigRequest.METHOD).params(request).build();
         PayloadAndHeaders payloadAndHeaders = 
applyInterceptors(ListTaskPushNotificationConfigRequest.METHOD, 
listTaskPushNotificationRequest, this.agentCard, context);
         try {
-            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.liteTopic);
-            if (StringUtils.isEmpty(responseMessageId)) {
-                log.error("RocketMQTransport 
listTaskPushNotificationConfigurations error, responseMessageId is null");
-                return null;
-            }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
-            CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
-            completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
-            String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
-            completableFutureMap.remove(responseMessageId);
-            ListTaskPushNotificationConfigResponse response = 
unmarshalResponse(result, 
LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
+            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+            ListTaskPushNotificationConfigResponse response = 
unmarshalResponse(getResult(responseMessageId, this.namespace), 
LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
             return response.getResult();
         } catch (Exception e) {
             log.error("RocketMQTransport 
listTaskPushNotificationConfigurations error: {}", e.getMessage());
@@ -427,23 +290,11 @@ public class RocketMQTransport implements ClientTransport 
{
     @Override
     public void 
deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationConfigParams 
request, ClientCallContext context) throws A2AClientException {
         checkNotNullParam("request", request);
-        DeleteTaskPushNotificationConfigRequest 
deleteTaskPushNotificationRequest = new 
DeleteTaskPushNotificationConfigRequest.Builder()
-            .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
-            .method(DeleteTaskPushNotificationConfigRequest.METHOD)
-            .params(request)
-            .build();
+        DeleteTaskPushNotificationConfigRequest 
deleteTaskPushNotificationRequest = new 
DeleteTaskPushNotificationConfigRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(DeleteTaskPushNotificationConfigRequest.METHOD).params(request).build();
         PayloadAndHeaders payloadAndHeaders = 
applyInterceptors(DeleteTaskPushNotificationConfigRequest.METHOD, 
deleteTaskPushNotificationRequest, agentCard, context);
         try {
-            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.liteTopic);
-            if (StringUtils.isEmpty(responseMessageId)) {
-                log.error("RocketMQTransport 
deleteTaskPushNotificationConfigurations error, responseMessageId is null");
-                return;
-            }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
-            CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
-            completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
-            objectCompletableFuture.get(120, TimeUnit.SECONDS);
-            completableFutureMap.remove(responseMessageId);
+            String responseMessageId = sendRocketMQRequest(payloadAndHeaders, 
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+            getResult(responseMessageId, this.namespace);
         } catch (Exception e) {
             log.error("RocketMQTransport 
deleteTaskPushNotificationConfigurations error: {}", e.getMessage());
         }
@@ -462,22 +313,10 @@ public class RocketMQTransport implements ClientTransport 
{
                 return agentCard;
             }
             try {
-                GetAuthenticatedExtendedCardRequest 
getExtendedAgentCardRequest = new GetAuthenticatedExtendedCardRequest.Builder()
-                    .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
-                    .method(GetAuthenticatedExtendedCardRequest.METHOD)
-                    .build(); // id will be randomly generated
+                GetAuthenticatedExtendedCardRequest 
getExtendedAgentCardRequest = new 
GetAuthenticatedExtendedCardRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(GetAuthenticatedExtendedCardRequest.METHOD).build();
 // id will be randomly generated
                 PayloadAndHeaders payloadAndHeaders = 
applyInterceptors(GetAuthenticatedExtendedCardRequest.METHOD, 
getExtendedAgentCardRequest, this.agentCard, context);
-                String responseMessageId = 
sendRocketMQRequest(payloadAndHeaders, this.liteTopic);
-                if (StringUtils.isEmpty(responseMessageId)) {
-                    log.error("RocketMQTransport getAgentCard 
responseMessageId is null");
-                    return null;
-                }
-                Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
-                CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
-                completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
-                String result = objectCompletableFuture.get(120, 
TimeUnit.SECONDS);
-                completableFutureMap.remove(responseMessageId);
-                GetAuthenticatedExtendedCardResponse response = 
unmarshalResponse(result, GET_AUTHENTICATED_EXTENDED_CARD_RESPONSE_REFERENCE);
+                String responseMessageId = 
sendRocketMQRequest(payloadAndHeaders, this.agentTopic, liteTopic, 
this.workAgentResponseTopic, this.producer);
+                GetAuthenticatedExtendedCardResponse response = 
unmarshalResponse(getResult(responseMessageId, this.namespace), 
GET_AUTHENTICATED_EXTENDED_CARD_RESPONSE_REFERENCE);
                 return response.getResult();
             } catch (Exception e) {
                 throw new A2AClientException("RocketMQTransport getAgentCard 
error: " + e, e);
@@ -488,42 +327,7 @@ public class RocketMQTransport implements ClientTransport {
     }
 
     @Override
-    public void close() {
-        try {
-            if (null != this.producer) {
-                this.producer.close();
-            }
-            if (null != this.litePushConsumer) {
-                this.litePushConsumer.close();
-            }
-            log.info("RocketMQTransport close success");
-        } catch (Exception e) {
-            log.error("RocketMQTransport close error: {}", e.getMessage());
-        }
-    }
-
-    private void checkConfigParam() {
-        if (StringUtils.isEmpty(this.endpoint) || 
StringUtils.isEmpty(this.workAgentResponseTopic) ||
-            StringUtils.isEmpty(this.workAgentResponseGroupID) || 
StringUtils.isEmpty(this.liteTopic) || StringUtils.isEmpty(agentTopic)) {
-
-            if (StringUtils.isEmpty(this.endpoint)) {
-                log.error("RocketMQTransport checkConfigParam endpoint is 
empty");
-            }
-            if (StringUtils.isEmpty(this.workAgentResponseTopic)) {
-                log.error("RocketMQTransport checkConfigParam 
workAgentResponseTopic is empty");
-            }
-            if (StringUtils.isEmpty(this.workAgentResponseGroupID)) {
-                log.error("RocketMQTransport checkConfigParam 
workAgentResponseGroupID is empty");
-            }
-            if (StringUtils.isEmpty(this.liteTopic)) {
-                log.error("RocketMQTransport checkConfigParam liteTopic is 
empty");
-            }
-            if (StringUtils.isEmpty(this.agentTopic)) {
-                log.error("RocketMQTransport checkConfigParam agentTopic is 
empty");
-            }
-            throw new RuntimeException("RocketMQTransport checkConfigParam 
error, init failed !!!");
-        }
-    }
+    public void close() {}
 
     private String dealLiteTopic(String contextId) {
         String liteTopic = this.liteTopic;
@@ -532,200 +336,12 @@ public class RocketMQTransport implements 
ClientTransport {
                 litePushConsumer.subscribeLite(contextId);
                 liteTopic = contextId;
             } catch (ClientException e) {
-
+                log.error("dealLiteTopic error: {}", e.getMessage());
             }
         }
         return liteTopic;
     }
 
-    private LitePushConsumer buildConsumer() throws ClientException {
-        if (StringUtils.isEmpty(this.endpoint) || 
StringUtils.isEmpty(this.workAgentResponseGroupID) || 
StringUtils.isEmpty(this.workAgentResponseTopic)) {
-            log.error("RocketMQTransport buildConsumer check param error");
-            return null;
-        }
-        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
-        SessionCredentialsProvider sessionCredentialsProvider = new 
StaticSessionCredentialsProvider(accessKey, secretKey);
-        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-            .setEndpoints(this.endpoint)
-            .setNamespace(this.namespace)
-            .setCredentialProvider(sessionCredentialsProvider)
-            .build();
-        LitePushConsumer litePushConsumer = 
provider.newLitePushConsumerBuilder()
-            .setClientConfiguration(clientConfiguration)
-            .setConsumerGroup(this.workAgentResponseGroupID)
-            .bindTopic(this.workAgentResponseTopic)
-            .setMessageListener(messageView -> {
-                try {
-                    Optional<String> liteTopicOpt = messageView.getLiteTopic();
-                    String liteTopic = liteTopicOpt.get();
-                    if (StringUtils.isEmpty(liteTopic)) {
-                        log.error("RocketMQTransport buildConsumer liteTopic 
is empty");
-                        return ConsumeResult.SUCCESS;
-                    }
-                    byte[] result = new 
byte[messageView.getBody().remaining()];
-                    messageView.getBody().get(result);
-                    String resultStr = new String(result, 
StandardCharsets.UTF_8);
-                    RocketMQResponse response = JSON.parseObject(resultStr, 
RocketMQResponse.class);
-                    if (null == response || 
StringUtils.isEmpty(response.getMessageId())) {
-                        log.error("RocketMQTransport litePushConsumer consumer 
error, response is null or messageId is empty");
-                        return ConsumeResult.SUCCESS;
-                    }
-                    if (!response.isStream()) {
-                        return dealNonStreamResult(response, this.namespace);
-                    }
-                    return dealStreamResult(response, this.namespace, 
liteTopic);
-                } catch (Exception e) {
-                    log.error("RocketMQTransport litePushConsumer consumer 
error, msgId: {}, error: {}", messageView.getMessageId(), e.getMessage());
-                    return ConsumeResult.SUCCESS;
-                }
-            }).build();
-        return litePushConsumer;
-    }
-
-    private ConsumeResult dealStreamResult(RocketMQResponse response, String 
namespace, String liteTopic) {
-        if (null == response || StringUtils.isEmpty(response.getMessageId()) 
|| StringUtils.isEmpty(liteTopic) || !response.isEnd() && 
StringUtils.isEmpty(response.getResponseBody())) {
-            log.error("RocketMQTransport dealStreamResult param is error, 
response: {}, liteTopic: {}", JSON.toJSONString(response), liteTopic);
-            return ConsumeResult.SUCCESS;
-        }
-
-        Map<String, SSEEventListener> sseEventListenerMap = 
MESSAGE_STREAM_RESPONSE_MAP.get(namespace);
-        if (null == sseEventListenerMap) {
-            log.error("RocketMQTransport dealStreamResult sseEventListenerMap 
is null");
-            return ConsumeResult.SUCCESS;
-        }
-        SSEEventListener sseEventListener = 
sseEventListenerMap.get(response.getMessageId());
-        if (null == sseEventListener) {
-            Map<String, Boolean> booleanMap = 
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.get(namespace);
-            if (null == booleanMap) {
-                log.error("RocketMQTransport dealStreamResult booleanMap is 
null");
-                return ConsumeResult.SUCCESS;
-            }
-            Boolean useDefaultRecoverModeConsumer = booleanMap.get(liteTopic);
-            if (null == useDefaultRecoverModeConsumer || 
!useDefaultRecoverModeConsumer) {
-                return ConsumeResult.SUCCESS;
-            }
-            if (!RECOVER_MESSAGE_STREAM_RESPONSE_MAP.isEmpty() && 
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.containsKey(namespace)) {
-                Map<String, SSEEventListener> sseEventListenerMapRecover = 
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.get(namespace);
-                if (null == sseEventListenerMapRecover) {
-                    log.error("RocketMQTransport dealStreamResult 
sseEventListenerMapRecover is null");
-                    return ConsumeResult.SUCCESS;
-                }
-                sseEventListener = 
sseEventListenerMapRecover.get(RocketMQA2AConstant.DEFAULT_STREAM_RECOVER);
-                if (null == sseEventListener) {
-                    log.error("RocketMQTransport dealStreamResult 
sseEventListenerMapRecover get sseEventListener is null");
-                    return ConsumeResult.SUCCESS;
-                }
-            }
-            if (null == sseEventListener) {
-                return ConsumeResult.SUCCESS;
-            }
-        }
-        String item = response.getResponseBody();
-        if (!StringUtils.isEmpty(item) && item.startsWith(DATA_PREFIX)) {
-            item = item.substring(5).trim();
-            if (!item.isEmpty()) {
-                try {
-                    sseEventListener.onMessage(item, new 
CompletableFuture<>());
-                } catch (Throwable e) {
-                    log.error("RocketMQTransport dealStreamResult error: {}", 
e.getMessage());
-                    return ConsumeResult.FAILURE;
-                }
-            }
-            if (response.isEnd() && 
!StringUtils.isEmpty(response.getMessageId())) {
-                sseEventListenerMap.remove(response.getMessageId());
-            }
-        }
-        return ConsumeResult.SUCCESS;
-    }
-
-    private ConsumeResult dealNonStreamResult(RocketMQResponse response, 
String namespace) {
-        if (null == response || StringUtils.isEmpty(response.getMessageId()) 
|| StringUtils.isEmpty(response.getResponseBody())) {
-            log.error("RocketMQTransport dealNonStreamResult param is error, 
response: {}", JSON.toJSONString(response));
-            return ConsumeResult.SUCCESS;
-        }
-        Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.get(namespace);
-        if (null != completableFutureMap && 
completableFutureMap.containsKey(response.getMessageId())) {
-            CompletableFuture<String> completableFuture = 
completableFutureMap.get(response.getMessageId());
-            completableFuture.complete(response.getResponseBody());
-        }
-        return ConsumeResult.SUCCESS;
-    }
-
-    private String sendRocketMQRequest(PayloadAndHeaders payloadAndHeaders, 
String liteTopic) throws JsonProcessingException {
-        if (null == payloadAndHeaders || StringUtils.isEmpty(this.agentTopic) 
|| StringUtils.isEmpty(liteTopic) || 
StringUtils.isEmpty(this.workAgentResponseTopic)) {
-            log.error("RocketMQTransport sendRocketMQRequest error, 
payloadAndHeaders: {}, agentTopic: {}, workAgentResponseTopic: {}, liteTopic: 
{}", payloadAndHeaders, this.agentTopic, this.workAgentResponseTopic, 
this.liteTopic);
-            return null;
-        }
-        RocketMQRequest request = new RocketMQRequest();
-        
request.setRequestBody(Utils.OBJECT_MAPPER.writeValueAsString(payloadAndHeaders.getPayload()));
-        request.setAgentTopic(this.agentTopic);
-        request.setWorkAgentResponseTopic(this.workAgentResponseTopic);
-        request.setLiteTopic(liteTopic);
-        if (payloadAndHeaders.getHeaders() != null) {
-            for (Map.Entry<String, String> entry : 
payloadAndHeaders.getHeaders().entrySet()) {
-                request.addHeader(entry.getKey(), entry.getValue());
-            }
-        }
-        String messageBodyStr = serialText(request);
-        if (StringUtils.isEmpty(messageBodyStr)) {
-            return null;
-        }
-        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
-        if (null == producer) {
-            log.error("RocketMQTransport sendRocketMQRequest producer is null, 
agentTopic: {}", this.agentTopic);
-            return null;
-        }
-        byte[] body = messageBodyStr.getBytes(StandardCharsets.UTF_8);
-        final Message message = provider.newMessageBuilder()
-            .setTopic(this.agentTopic)
-            .setBody(body)
-            .build();
-        try {
-            final SendReceipt sendReceipt = producer.send(message);
-            if (!StringUtils.isEmpty(sendReceipt.getMessageId().toString())) {
-                return sendReceipt.getMessageId().toString();
-            }
-        } catch (Throwable t) {
-            return null;
-        }
-        return null;
-    }
-    private static void printPrompt(String role) {
-        System.out.print("\n\u001B[36m" + role + " > \u001B[0m");
-    }
-
-    private <T extends JSONRPCResponse<?>> T unmarshalResponse(String 
response, TypeReference<T> typeReference)
-        throws A2AClientException, JsonProcessingException {
-        T value = Utils.unmarshalFrom(response, typeReference);
-        JSONRPCError error = value.getError();
-        if (error != null) {
-            throw new A2AClientException(error.getMessage() + (error.getData() 
!= null ? ": " + error.getData() : ""), error);
-        }
-        return value;
-    }
-
-    private static String serialText(RocketMQRequest rocketMQRequest) {
-        if (null == rocketMQRequest || 
StringUtils.isEmpty(rocketMQRequest.getRequestBody()) || 
StringUtils.isEmpty(rocketMQRequest.getWorkAgentResponseTopic()) || 
StringUtils.isEmpty(rocketMQRequest.getLiteTopic()) || 
StringUtils.isEmpty(rocketMQRequest.getAgentTopic())) {
-            return null;
-        }
-        return JSON.toJSONString(rocketMQRequest);
-    }
-
-    private Producer buildProducer(String... topics) throws ClientException {
-        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
-        SessionCredentialsProvider sessionCredentialsProvider = new 
StaticSessionCredentialsProvider(accessKey, secretKey);
-        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-            .setEndpoints(this.endpoint)
-            .setNamespace(this.namespace)
-            .setCredentialProvider(sessionCredentialsProvider)
-            .setRequestTimeout(Duration.ofSeconds(15))
-            .build();
-        final ProducerBuilder builder = provider.newProducerBuilder()
-            .setClientConfiguration(clientConfiguration)
-            .setTopics(topics);
-        return builder.build();
-    }
-
     private PayloadAndHeaders applyInterceptors(String methodName, Object 
payload, AgentCard agentCard, ClientCallContext clientCallContext) {
         PayloadAndHeaders payloadAndHeaders = new PayloadAndHeaders(payload, 
getHttpHeaders(clientCallContext));
         if (interceptors != null && !interceptors.isEmpty()) {
@@ -739,89 +355,4 @@ public class RocketMQTransport implements ClientTransport {
     private Map<String, String> getHttpHeaders(@Nullable ClientCallContext 
context) {
         return context != null ? context.getHeaders() : Collections.emptyMap();
     }
-
-    private RocketMQResourceInfo parseAgentCardAddition(AgentCard agentCard) {
-        if (null == agentCard || 
StringUtils.isEmpty(agentCard.preferredTransport()) || 
StringUtils.isEmpty(agentCard.url()) || null == 
agentCard.additionalInterfaces() || agentCard.additionalInterfaces().isEmpty()) 
{
-            log.error("parseAgentCardAddition param error, agentCard: {}", 
JSON.toJSONString(agentCard));
-            return null;
-        }
-        RocketMQResourceInfo rocketMQResourceInfo = null;
-        String preferredTransport = agentCard.preferredTransport();
-        if (RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(preferredTransport)) {
-            String url = agentCard.url();
-            rocketMQResourceInfo = pareAgentCardUrl(url);
-            if (null != rocketMQResourceInfo && 
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) && 
!StringUtils.isEmpty(rocketMQResourceInfo.getTopic())) {
-                log.info("RocketMQTransport get rocketMQResourceInfo from 
preferredTransport");
-                return rocketMQResourceInfo;
-            }
-        }
-        List<AgentInterface> agentInterfaces = 
agentCard.additionalInterfaces();
-        for (AgentInterface agentInterface : agentInterfaces) {
-            String transport = agentInterface.transport();
-            if (!StringUtils.isEmpty(transport) && 
RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(transport)) {
-                String url = agentInterface.url();
-                rocketMQResourceInfo = pareAgentCardUrl(url);
-                if (null != rocketMQResourceInfo && 
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) && 
!StringUtils.isEmpty(rocketMQResourceInfo.getTopic())) {
-                    log.error("RocketMQTransport get rocketMQResourceInfo from 
additionalInterfaces");
-                    return rocketMQResourceInfo;
-                }
-            }
-        }
-        return null;
-    }
-
-    private static RocketMQResourceInfo pareAgentCardUrl(String agentCardUrl) {
-        if (StringUtils.isEmpty(agentCardUrl)) {
-            return null;
-        }
-        String agentUrl = agentCardUrl.replace(HTTP_URL_PREFIX, "");
-        String replaceFinal = agentUrl.replace(HTTPS_URL_PREFIX, "");
-        String[] split = replaceFinal.split("/");
-        if (split.length != 3) {
-            return null;
-        }
-        RocketMQResourceInfo rocketMQResourceInfo = new RocketMQResourceInfo();
-        rocketMQResourceInfo.setEndpoint(split[0].trim());
-        rocketMQResourceInfo.setNamespace(split[1].trim());
-        rocketMQResourceInfo.setTopic(split[2].trim());
-        return rocketMQResourceInfo;
-    }
-
-    private static class RocketMQResourceInfo {
-        private String endpoint;
-        private String topic;
-        private String namespace;
-
-        public RocketMQResourceInfo(String endpoint, String topic) {
-            this.endpoint = endpoint;
-            this.topic = topic;
-        }
-
-        public RocketMQResourceInfo() {}
-
-        public String getEndpoint() {
-            return endpoint;
-        }
-
-        public void setEndpoint(String endpoint) {
-            this.endpoint = endpoint;
-        }
-
-        public String getTopic() {
-            return topic;
-        }
-
-        public void setTopic(String topic) {
-            this.topic = topic;
-        }
-
-        public String getNamespace() {
-            return namespace;
-        }
-
-        public void setNamespace(String namespace) {
-            this.namespace = namespace;
-        }
-    }
-
 }

Reply via email to