This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-a2a.git
The following commit(s) were added to refs/heads/main by this push:
new 9eb59ae Optimize the code (#8)
9eb59ae is described below
commit 9eb59aeae4366edea47fc71b6480148e3630fe35
Author: Drizzle <[email protected]>
AuthorDate: Tue Dec 16 09:56:02 2025 +0800
Optimize the code (#8)
* update
Change-Id: I2b5f6364699c1c9e07b6ad381b801c49ce641559
* optimize the code
Change-Id: If2495360c76970ac88387e59e3ac15a6237e2f3e
* update
Change-Id: Iceef7a1bd1d05bfa498694ba7c3a007bbfdf378e
* update
Change-Id: Id0b75d941f02387d7b05a96731ea936526bae012
* add spring web demo
Change-Id: I74483972366bd35fee4a8e6c58b928204b3669ed
* update example
Change-Id: I00ec96acdbf1b7b5aabad5d28b10667b605635e1
* update
Change-Id: I0f51bf41c079d782a1a3adbbd388a32a50170980
* update
Change-Id: Id0e84eb0f4aa59d7aa020b36a7de85a8583f142d
* update demo
Change-Id: Ic7af56d1b946b119816848725465b60335bfc9ee
* optimzie the code
Change-Id: Ie45c5bbb6d27af2dd2bd2c66e5b0ee18faa91a41
* update
Change-Id: I930f4792680e577d14b6ae5f28264c03e93d987c
* change the version
Change-Id: I24e144b98c8e7ff5939c2ebdf609c8535aa9bce8
---------
Co-authored-by: drizzle.zk <[email protected]>
---
example/rocketmq-multiagent-base-adk/README.md | 4 +-
.../SupervisorAgent-Web/pom.xml | 2 +-
.../java/org/example/service/AgentService.java | 2 +-
.../agent/SupervisorAgentA2ASDKMainStream.java | 5 +-
example/rocketmq-multiagent-base-adk/pom.xml | 2 +-
.../a2a/server/RocketMQA2AServerRoutes.java | 14 +--
.../rocketmq/a2a/transport/RocketMQTransport.java | 100 ++++++++++-----------
.../a2a/transport/RocketMQTransportConfig.java | 38 ++++----
.../a2a/transport/RocketMQTransportProvider.java | 2 +-
9 files changed, 82 insertions(+), 87 deletions(-)
diff --git a/example/rocketmq-multiagent-base-adk/README.md
b/example/rocketmq-multiagent-base-adk/README.md
index f0383a6..5900c88 100644
--- a/example/rocketmq-multiagent-base-adk/README.md
+++ b/example/rocketmq-multiagent-base-adk/README.md
@@ -44,7 +44,7 @@ cd WeatherAgent
```
```shell
-MAVEN_OPTS="-DrocketMQEndpoint= -DrocketMQNamespace=
-DbizTopic=WeatherAgentTask -DbizConsumerGroup=WeatherAgentTaskConsumerGroup
-DrocketMQAk= -DrocketMQSk= -DapiKey= -DappId= " mvn quarkus:dev
+MAVEN_OPTS="-DrocketMQEndpoint= -DrocketMQNamespace=
-DbizTopic=WeatherAgentTask -DbizConsumerGroup=WeatherAgentTaskConsumerGroup
-DrocketMQAK= -DrocketMQSK= -DapiKey= -DappId= " mvn quarkus:dev
```

@@ -54,7 +54,7 @@ cd TravelAgent
```
```shell
- MAVEN_OPTS="-DrocketMQEndpoint= -DrocketMQNamespace=
-DbizTopic=TravelAgentTask -DbizConsumerGroup=TravelAgentTaskConsumerGroup
-DrocketMQAk= -DrocketMQSk= -DapiKey= -DappId= " mvn quarkus:dev
+ MAVEN_OPTS="-DrocketMQEndpoint= -DrocketMQNamespace=
-DbizTopic=TravelAgentTask -DbizConsumerGroup=TravelAgentTaskConsumerGroup
-DrocketMQAK= -DrocketMQSK= -DapiKey= -DappId= " mvn quarkus:dev
```

diff --git a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
index 284f0f5..7810d9a 100644
--- a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
+++ b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
@@ -119,7 +119,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-a2a</artifactId>
- <version>1.0.6</version>
+ <version>1.0.7</version>
</dependency>
</dependencies>
diff --git
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
index ad19009..512e17e 100644
---
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
+++
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
@@ -313,7 +313,7 @@ public class AgentService {
};
//config rocketmq info
RocketMQTransportConfig rocketMQTransportConfig = new
RocketMQTransportConfig();
- rocketMQTransportConfig.setRocketMQNamespace(ROCKETMQ_NAMESPACE);
+ rocketMQTransportConfig.setNamespace(ROCKETMQ_NAMESPACE);
rocketMQTransportConfig.setAccessKey(accessKey);
rocketMQTransportConfig.setSecretKey(secretKey);
rocketMQTransportConfig.setWorkAgentResponseGroupID(WORK_AGENT_RESPONSE_GROUP_ID);
diff --git
a/example/rocketmq-multiagent-base-adk/SupervisorAgent/src/main/java/agent/SupervisorAgentA2ASDKMainStream.java
b/example/rocketmq-multiagent-base-adk/SupervisorAgent/src/main/java/agent/SupervisorAgentA2ASDKMainStream.java
index 199a1cd..e41a9b6 100644
---
a/example/rocketmq-multiagent-base-adk/SupervisorAgent/src/main/java/agent/SupervisorAgentA2ASDKMainStream.java
+++
b/example/rocketmq-multiagent-base-adk/SupervisorAgent/src/main/java/agent/SupervisorAgentA2ASDKMainStream.java
@@ -263,7 +263,7 @@ public class SupervisorAgentA2ASDKMainStream {
System.err.println("Streaming error occurred: " +
error.getMessage());
};
RocketMQTransportConfig rocketMQTransportConfig = new
RocketMQTransportConfig();
- rocketMQTransportConfig.setRocketMQNamespace(ROCKETMQ_NAMESPACE);
+ rocketMQTransportConfig.setNamespace(ROCKETMQ_NAMESPACE);
rocketMQTransportConfig.setAccessKey(accessKey);
rocketMQTransportConfig.setSecretKey(secretKey);
rocketMQTransportConfig.setWorkAgentResponseGroupID(WORK_AGENT_RESPONSE_GROUP_ID);
@@ -328,11 +328,8 @@ public class SupervisorAgentA2ASDKMainStream {
} catch (Exception e) {
System.out.println("解析过程出现异常");
}
- } else {
- //System.out.println(content);
}
} else {
- //System.out.println(content);
log.debug("Agent 响应: {}", content);
}
});
diff --git a/example/rocketmq-multiagent-base-adk/pom.xml
b/example/rocketmq-multiagent-base-adk/pom.xml
index 4860805..9810267 100644
--- a/example/rocketmq-multiagent-base-adk/pom.xml
+++ b/example/rocketmq-multiagent-base-adk/pom.xml
@@ -81,7 +81,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-a2a</artifactId>
- <version>1.0.6</version>
+ <version>1.0.7</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
diff --git
a/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
b/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
index 8bed7c8..321bc68 100644
--- a/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
+++ b/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
@@ -97,8 +97,8 @@ public class RocketMQA2AServerRoutes extends A2AServerRoutes {
private static final String ROCKETMQ_NAMESPACE =
System.getProperty("rocketMQNamespace", "");
private static final String BIZ_TOPIC = System.getProperty("bizTopic", "");
private static final String BIZ_CONSUMER_GROUP =
System.getProperty("bizConsumerGroup", "");
- private static final String ACCESS_KEY = System.getProperty("rocketMQAk",
"");
- private static final String SECRET_KEY = System.getProperty("rocketMQSk",
"");
+ private static final String ACCESS_KEY = System.getProperty("rocketMQAK",
"");
+ private static final String SECRET_KEY = System.getProperty("rocketMQSK",
"");
@Inject
JSONRPCHandler jsonRpcHandler;
@@ -241,7 +241,7 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
private static Message buildMessage(String topic, String liteTopic,
RocketMQResponse response) {
if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic)) {
- log.info("RocketMQA2AServerRoutes buildMessage param error, topic:
{}, liteTopic: {}, response: {}", topic, liteTopic,
JSON.toJSONString(response));
+ log.error("RocketMQA2AServerRoutes buildMessage param error,
topic: {}, liteTopic: {}, response: {}", topic, liteTopic,
JSON.toJSONString(response));
return null;
}
String missionJsonStr = JSON.toJSONString(response);
@@ -375,7 +375,7 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
SendReceipt send =
producer.send(buildMessage(workAgentResponseTopic, liteTopic, response));
log.info("MultiSseSupport send response success,
msgId: {}, time: {}, response: {}", send.getMessageId(),
System.currentTimeMillis(), JSON.toJSONString(response));
} catch (ClientException e) {
- log.info("MultiSseSupport error send complete, msgId:
{}", e.getMessage());
+ log.error("MultiSseSupport error send complete, msgId:
{}", e.getMessage());
}
completableFuture.complete(true);
}
@@ -411,13 +411,13 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
private void checkConfigParam() {
if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) ||
StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) {
if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT)) {
- log.info("rocketMQEndpoint is empty");
+ log.error("rocketMQEndpoint is empty");
}
if (StringUtils.isEmpty(BIZ_TOPIC)) {
- log.info("bizTopic is empty");
+ log.error("bizTopic is empty");
}
if (StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) {
- log.info("bizConsumerGroup is empty");
+ log.error("bizConsumerGroup is empty");
}
throw new RuntimeException("RocketMQA2AServerRoutes check init
rocketmq param error, init failed!!!");
}
diff --git
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
index 821ba11..17baa24 100644
--- a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
+++ b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
@@ -117,8 +117,8 @@ public class RocketMQTransport implements ClientTransport {
private final String agentTopic;
private final String accessKey;
private final String secretKey;
- private final String rocketMQEndpoint;
- private final String rocketNamespace;
+ private final String endpoint;
+ private final String namespace;
private final String workAgentResponseTopic;
private final String workAgentResponseGroupID;
private final List<ClientCallInterceptor> interceptors;
@@ -131,7 +131,7 @@ public class RocketMQTransport implements ClientTransport {
private LitePushConsumer litePushConsumer;
private Producer producer;
- public RocketMQTransport(String rocketNamespace, String accessKey, String
secretKey, String workAgentResponseTopic, String workAgentResponseGroupID,
+ public RocketMQTransport(String namespace, String accessKey, String
secretKey, String workAgentResponseTopic, String workAgentResponseGroupID,
List<ClientCallInterceptor> interceptors, String agentUrl,
A2AHttpClient httpClient, String liteTopic, boolean useDefaultRecoverMode,
AgentCard agentCard) {
this.accessKey = accessKey;
this.secretKey = secretKey;
@@ -150,23 +150,23 @@ public class RocketMQTransport implements ClientTransport
{
if (null == rocketAgentCardInfo) {
throw new RuntimeException("RocketMQTransport rocketAgentCardInfo
pare error");
}
- if (null != rocketNamespace &&
!rocketNamespace.equals(rocketAgentCardInfo.getNamespace())) {
+ if (null != namespace &&
!namespace.equals(rocketAgentCardInfo.getNamespace())) {
throw new RuntimeException("RocketMQTransport rocketAgentCardInfo
namespace do not match, please check the config info");
}
- this.rocketMQEndpoint = rocketAgentCardInfo.getEndpoint();
+ this.endpoint = rocketAgentCardInfo.getEndpoint();
this.agentTopic = rocketAgentCardInfo.getTopic();
- this.rocketNamespace =
StringUtils.isEmpty(rocketAgentCardInfo.getNamespace()) ? "" :
rocketAgentCardInfo.getNamespace();
-
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.rocketNamespace, k ->
new HashMap<>()).put(this.liteTopic, useDefaultRecoverMode);
+ this.namespace =
StringUtils.isEmpty(rocketAgentCardInfo.getNamespace()) ? "" :
rocketAgentCardInfo.getNamespace();
+ LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.namespace, k
-> new HashMap<>()).put(this.liteTopic, useDefaultRecoverMode);
checkConfigParam();
initRocketMQProducerAndConsumer();
}
private void initRocketMQProducerAndConsumer() {
- if (StringUtils.isEmpty(this.rocketMQEndpoint) ||
StringUtils.isEmpty(this.workAgentResponseTopic) ||
StringUtils.isEmpty(this.liteTopic)) {
+ if (StringUtils.isEmpty(this.endpoint) ||
StringUtils.isEmpty(this.workAgentResponseTopic) ||
StringUtils.isEmpty(this.liteTopic)) {
throw new A2AClientException("RocketMQTransport
initRocketMQProducerAndConsumer param error");
}
try {
- Map<String, LitePushConsumer> consumerMap =
ROCKETMQ_CONSUMER_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
+ Map<String, LitePushConsumer> consumerMap =
ROCKETMQ_CONSUMER_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
if (consumerMap.containsKey(this.workAgentResponseTopic)) {
this.litePushConsumer =
consumerMap.get(this.workAgentResponseTopic);
this.litePushConsumer.subscribeLite(this.liteTopic);
@@ -184,14 +184,14 @@ public class RocketMQTransport implements ClientTransport
{
this.litePushConsumer = litePushConsumer;
}
}
- Map<String, Producer> producerMap =
ROCKETMQ_PRODUCER_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
+ Map<String, Producer> producerMap =
ROCKETMQ_PRODUCER_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
if (!producerMap.containsKey(this.agentTopic)) {
this.producer = buildProducer(this.agentTopic);
producerMap.put(this.agentTopic, this.producer);
}
log.info("RocketMQTransport initRocketMQProducerAndConsumer
success");
} catch (Exception e) {
- log.info("RocketMQTransport initRocketMQProducerAndConsumer error:
{}", e.getMessage());
+ log.error("RocketMQTransport initRocketMQProducerAndConsumer
error: {}", e.getMessage());
}
}
@@ -210,7 +210,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport sendMessage error,
responseMessageId is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -241,7 +241,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport sendMessageStreaming error,
responseMessageId is null");
return;
}
- MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace,
k -> new HashMap<>()).put(responseMessageId, sseEventListener);
+ MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(this.namespace, k ->
new HashMap<>()).put(responseMessageId, sseEventListener);
log.info("RocketMQTransport sendMessageStreaming success,
responseMessageId: {}", responseMessageId);
} catch (Exception e) {
throw new A2AClientException("RocketMQTransport Failed to send
streaming message request: " + e, e);
@@ -258,24 +258,24 @@ public class RocketMQTransport implements ClientTransport
{
if (null != request.metadata()) {
String responseMessageId =
(String)request.metadata().get(RocketMQA2AConstant.MESSAGE_RESPONSE_ID);
if (!StringUtils.isEmpty(responseMessageId)) {
-
MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>()).put(responseMessageId, sseEventListener);
+
MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new
HashMap<>()).put(responseMessageId, sseEventListener);
}
String liteTopic =
(String)request.metadata().get(RocketMQA2AConstant.LITE_TOPIC);
if (null != litePushConsumer &&
!StringUtils.isEmpty(liteTopic)) {
litePushConsumer.subscribeLite(liteTopic);
log.info("litePushConsumer subscribeLite liteTopic: {}",
liteTopic);
-
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.rocketNamespace, k ->
new HashMap<>()).put(liteTopic, this.useDefaultRecoverMode);
+
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.namespace, k -> new
HashMap<>()).put(liteTopic, this.useDefaultRecoverMode);
}
String closeLiteTopic =
(String)request.metadata().get(RocketMQA2AConstant.CLOSE_LITE_TOPIC);
if (null != litePushConsumer &&
!StringUtils.isEmpty(closeLiteTopic)) {
litePushConsumer.unsubscribeLite(closeLiteTopic);
log.info("litePushConsumer unsubscribeLite liteTopic: {}",
closeLiteTopic);
-
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.rocketNamespace, k ->
new HashMap<>()).remove(closeLiteTopic);
+
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.namespace, k -> new
HashMap<>()).remove(closeLiteTopic);
}
}
if (this.useDefaultRecoverMode) {
-
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(rocketNamespace, k -> new
HashMap<>()).put(RocketMQA2AConstant.DEFAULT_STREAM_RECOVER, sseEventListener);
+ RECOVER_MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(namespace,
k -> new HashMap<>()).put(RocketMQA2AConstant.DEFAULT_STREAM_RECOVER,
sseEventListener);
}
} catch (Exception e) {
throw new A2AClientException("RocketMQTransport failed to
resubscribe streaming message request: " + e, e);
@@ -296,7 +296,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport getTask error, responseMessageId
is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -324,7 +324,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport cancelTask error,
responseMessageId is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -353,7 +353,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport
setTaskPushNotificationConfiguration error, responseMessageId is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -383,7 +383,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport
getTaskPushNotificationConfiguration error, responseMessageId is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
CompletableFuture<String> completableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId, completableFuture);
String result = completableFuture.get(120, TimeUnit.SECONDS);
@@ -411,7 +411,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport
listTaskPushNotificationConfigurations error, responseMessageId is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -439,7 +439,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport
deleteTaskPushNotificationConfigurations error, responseMessageId is null");
return;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -472,7 +472,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport getAgentCard
responseMessageId is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
String result = objectCompletableFuture.get(120,
TimeUnit.SECONDS);
@@ -503,23 +503,23 @@ public class RocketMQTransport implements ClientTransport
{
}
private void checkConfigParam() {
- if (StringUtils.isEmpty(this.rocketMQEndpoint) ||
StringUtils.isEmpty(this.workAgentResponseTopic) ||
+ if (StringUtils.isEmpty(this.endpoint) ||
StringUtils.isEmpty(this.workAgentResponseTopic) ||
StringUtils.isEmpty(this.workAgentResponseGroupID) ||
StringUtils.isEmpty(this.liteTopic) || StringUtils.isEmpty(agentTopic)) {
- if (StringUtils.isEmpty(this.rocketMQEndpoint)) {
- log.info("RocketMQTransport checkConfigParam rocketMQEndpoint
is empty");
+ if (StringUtils.isEmpty(this.endpoint)) {
+ log.error("RocketMQTransport checkConfigParam endpoint is
empty");
}
if (StringUtils.isEmpty(this.workAgentResponseTopic)) {
- log.info("RocketMQTransport checkConfigParam
workAgentResponseTopic is empty");
+ log.error("RocketMQTransport checkConfigParam
workAgentResponseTopic is empty");
}
if (StringUtils.isEmpty(this.workAgentResponseGroupID)) {
- log.info("RocketMQTransport checkConfigParam
workAgentResponseGroupID is empty");
+ log.error("RocketMQTransport checkConfigParam
workAgentResponseGroupID is empty");
}
if (StringUtils.isEmpty(this.liteTopic)) {
- log.info("RocketMQTransport checkConfigParam liteTopic is
empty");
+ log.error("RocketMQTransport checkConfigParam liteTopic is
empty");
}
if (StringUtils.isEmpty(this.agentTopic)) {
- log.info("RocketMQTransport checkConfigParam agentTopic is
empty");
+ log.error("RocketMQTransport checkConfigParam agentTopic is
empty");
}
throw new RuntimeException("RocketMQTransport checkConfigParam
error, init failed !!!");
}
@@ -539,15 +539,15 @@ public class RocketMQTransport implements ClientTransport
{
}
private LitePushConsumer buildConsumer() throws ClientException {
- if (StringUtils.isEmpty(this.rocketMQEndpoint) ||
StringUtils.isEmpty(this.workAgentResponseGroupID) ||
StringUtils.isEmpty(this.workAgentResponseTopic)) {
+ if (StringUtils.isEmpty(this.endpoint) ||
StringUtils.isEmpty(this.workAgentResponseGroupID) ||
StringUtils.isEmpty(this.workAgentResponseTopic)) {
log.error("RocketMQTransport buildConsumer check param error");
return null;
}
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(this.rocketMQEndpoint)
- .setNamespace(this.rocketNamespace)
+ .setEndpoints(this.endpoint)
+ .setNamespace(this.namespace)
.setCredentialProvider(sessionCredentialsProvider)
.build();
LitePushConsumer litePushConsumer =
provider.newLitePushConsumerBuilder()
@@ -571,9 +571,9 @@ public class RocketMQTransport implements ClientTransport {
return ConsumeResult.SUCCESS;
}
if (!response.isStream()) {
- return dealNonStreamResult(response,
this.rocketNamespace);
+ return dealNonStreamResult(response, this.namespace);
}
- return dealStreamResult(response, this.rocketNamespace,
liteTopic);
+ return dealStreamResult(response, this.namespace,
liteTopic);
} catch (Exception e) {
log.error("RocketMQTransport litePushConsumer consumer
error, msgId: {}, error: {}", messageView.getMessageId(), e.getMessage());
return ConsumeResult.SUCCESS;
@@ -582,20 +582,20 @@ public class RocketMQTransport implements ClientTransport
{
return litePushConsumer;
}
- private ConsumeResult dealStreamResult(RocketMQResponse response, String
rocketMQNamespace, String liteTopic) {
+ private ConsumeResult dealStreamResult(RocketMQResponse response, String
namespace, String liteTopic) {
if (null == response || StringUtils.isEmpty(response.getMessageId())
|| StringUtils.isEmpty(liteTopic) || !response.isEnd() &&
StringUtils.isEmpty(response.getResponseBody())) {
log.error("RocketMQTransport dealStreamResult param is error,
response: {}, liteTopic: {}", JSON.toJSONString(response), liteTopic);
return ConsumeResult.SUCCESS;
}
- Map<String, SSEEventListener> sseEventListenerMap =
MESSAGE_STREAM_RESPONSE_MAP.get(rocketMQNamespace);
+ Map<String, SSEEventListener> sseEventListenerMap =
MESSAGE_STREAM_RESPONSE_MAP.get(namespace);
if (null == sseEventListenerMap) {
log.error("RocketMQTransport dealStreamResult sseEventListenerMap
is null");
return ConsumeResult.SUCCESS;
}
SSEEventListener sseEventListener =
sseEventListenerMap.get(response.getMessageId());
if (null == sseEventListener) {
- Map<String, Boolean> booleanMap =
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.get(rocketMQNamespace);
+ Map<String, Boolean> booleanMap =
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.get(namespace);
if (null == booleanMap) {
log.error("RocketMQTransport dealStreamResult booleanMap is
null");
return ConsumeResult.SUCCESS;
@@ -604,8 +604,8 @@ public class RocketMQTransport implements ClientTransport {
if (null == useDefaultRecoverModeConsumer ||
!useDefaultRecoverModeConsumer) {
return ConsumeResult.SUCCESS;
}
- if (!RECOVER_MESSAGE_STREAM_RESPONSE_MAP.isEmpty() &&
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.containsKey(rocketMQNamespace)) {
- Map<String, SSEEventListener> sseEventListenerMapRecover =
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.get(rocketMQNamespace);
+ if (!RECOVER_MESSAGE_STREAM_RESPONSE_MAP.isEmpty() &&
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.containsKey(namespace)) {
+ Map<String, SSEEventListener> sseEventListenerMapRecover =
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.get(namespace);
if (null == sseEventListenerMapRecover) {
log.error("RocketMQTransport dealStreamResult
sseEventListenerMapRecover is null");
return ConsumeResult.SUCCESS;
@@ -638,12 +638,12 @@ public class RocketMQTransport implements ClientTransport
{
return ConsumeResult.SUCCESS;
}
- private ConsumeResult dealNonStreamResult(RocketMQResponse response,
String rocketMQNamespace) {
+ private ConsumeResult dealNonStreamResult(RocketMQResponse response,
String namespace) {
if (null == response || StringUtils.isEmpty(response.getMessageId())
|| StringUtils.isEmpty(response.getResponseBody())) {
log.error("RocketMQTransport dealNonStreamResult param is error,
response: {}", JSON.toJSONString(response));
return ConsumeResult.SUCCESS;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.get(rocketMQNamespace);
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.get(namespace);
if (null != completableFutureMap &&
completableFutureMap.containsKey(response.getMessageId())) {
CompletableFuture<String> completableFuture =
completableFutureMap.get(response.getMessageId());
completableFuture.complete(response.getResponseBody());
@@ -715,8 +715,8 @@ public class RocketMQTransport implements ClientTransport {
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(this.rocketMQEndpoint)
- .setNamespace(this.rocketNamespace)
+ .setEndpoints(this.endpoint)
+ .setNamespace(this.namespace)
.setCredentialProvider(sessionCredentialsProvider)
.setRequestTimeout(Duration.ofSeconds(15))
.build();
@@ -742,7 +742,7 @@ public class RocketMQTransport implements ClientTransport {
private RocketMQResourceInfo parseAgentCardAddition(AgentCard agentCard) {
if (null == agentCard ||
StringUtils.isEmpty(agentCard.preferredTransport()) ||
StringUtils.isEmpty(agentCard.url()) || null ==
agentCard.additionalInterfaces() || agentCard.additionalInterfaces().isEmpty())
{
- log.info("parseAgentCardAddition param error, agentCard: {}",
JSON.toJSONString(agentCard));
+ log.error("parseAgentCardAddition param error, agentCard: {}",
JSON.toJSONString(agentCard));
return null;
}
RocketMQResourceInfo rocketMQResourceInfo = null;
@@ -750,8 +750,7 @@ public class RocketMQTransport implements ClientTransport {
if (RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(preferredTransport)) {
String url = agentCard.url();
rocketMQResourceInfo = pareAgentCardUrl(url);
- if (null != rocketMQResourceInfo &&
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) &&
!StringUtils.isEmpty(
- rocketMQResourceInfo.getTopic())) {
+ if (null != rocketMQResourceInfo &&
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) &&
!StringUtils.isEmpty(rocketMQResourceInfo.getTopic())) {
log.info("RocketMQTransport get rocketMQResourceInfo from
preferredTransport");
return rocketMQResourceInfo;
}
@@ -762,9 +761,8 @@ public class RocketMQTransport implements ClientTransport {
if (!StringUtils.isEmpty(transport) &&
RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(transport)) {
String url = agentInterface.url();
rocketMQResourceInfo = pareAgentCardUrl(url);
- if (null != rocketMQResourceInfo &&
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) &&
!StringUtils.isEmpty(
- rocketMQResourceInfo.getTopic())) {
- log.info("RocketMQTransport get rocketMQResourceInfo from
additionalInterfaces");
+ if (null != rocketMQResourceInfo &&
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) &&
!StringUtils.isEmpty(rocketMQResourceInfo.getTopic())) {
+ log.error("RocketMQTransport get rocketMQResourceInfo from
additionalInterfaces");
return rocketMQResourceInfo;
}
}
diff --git
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportConfig.java
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportConfig.java
index 49757c6..bdae321 100644
---
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportConfig.java
+++
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportConfig.java
@@ -22,8 +22,8 @@ import io.a2a.client.transport.spi.ClientTransportConfig;
public class RocketMQTransportConfig extends
ClientTransportConfig<RocketMQTransport> {
private String accessKey;
private String secretKey;
- private String globalEndpoint;
- private String rocketMQNamespace;
+ private String endpoint;
+ private String namespace;
private String workAgentResponseTopic;
private String workAgentResponseGroupID;
private String agentTopic;
@@ -31,24 +31,24 @@ public class RocketMQTransportConfig extends
ClientTransportConfig<RocketMQTrans
private String liteTopic;
private boolean useDefaultRecoverMode = false;
- public RocketMQTransportConfig(String accessKey, String secretKey, String
globalEndpoint, String rocketMQNamespace,
+ public RocketMQTransportConfig(String accessKey, String secretKey, String
endpoint, String namespace,
String workAgentResponseTopic, String workAgentResponseGroupID, String
agentTopic, A2AHttpClient httpClient) {
this.accessKey = accessKey;
this.secretKey = secretKey;
- this.globalEndpoint = globalEndpoint;
- this.rocketMQNamespace = rocketMQNamespace;
+ this.endpoint = endpoint;
+ this.namespace = namespace;
this.workAgentResponseTopic = workAgentResponseTopic;
this.workAgentResponseGroupID = workAgentResponseGroupID;
this.agentTopic = agentTopic;
this.httpClient = httpClient;
}
- public RocketMQTransportConfig(String accessKey, String secretKey, String
globalEndpoint, String rocketMQNamespace,
+ public RocketMQTransportConfig(String accessKey, String secretKey, String
endpoint, String namespace,
String workAgentResponseTopic, String workAgentResponseGroupID, String
agentTopic, A2AHttpClient httpClient, String liteTopic, boolean
useDefaultRecoverMode) {
this.accessKey = accessKey;
this.secretKey = secretKey;
- this.globalEndpoint = globalEndpoint;
- this.rocketMQNamespace = rocketMQNamespace;
+ this.endpoint = endpoint;
+ this.namespace = namespace;
this.workAgentResponseTopic = workAgentResponseTopic;
this.workAgentResponseGroupID = workAgentResponseGroupID;
this.agentTopic = agentTopic;
@@ -57,11 +57,11 @@ public class RocketMQTransportConfig extends
ClientTransportConfig<RocketMQTrans
this.useDefaultRecoverMode = useDefaultRecoverMode;
}
- public RocketMQTransportConfig(String accessKey, String secretKey, String
globalEndpoint, String rocketMQNamespace, String agentTopic, A2AHttpClient
httpClient) {
+ public RocketMQTransportConfig(String accessKey, String secretKey, String
endpoint, String namespace, String agentTopic, A2AHttpClient httpClient) {
this.accessKey = accessKey;
this.secretKey = secretKey;
- this.globalEndpoint = globalEndpoint;
- this.rocketMQNamespace = rocketMQNamespace;
+ this.endpoint = endpoint;
+ this.namespace = namespace;
this.agentTopic = agentTopic;
this.httpClient = httpClient;
}
@@ -88,20 +88,20 @@ public class RocketMQTransportConfig extends
ClientTransportConfig<RocketMQTrans
this.secretKey = secretKey;
}
- public String getGlobalEndpoint() {
- return globalEndpoint;
+ public String getEndpoint() {
+ return endpoint;
}
- public void setGlobalEndpoint(String globalEndpoint) {
- this.globalEndpoint = globalEndpoint;
+ public void setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
}
- public String getRocketMQNamespace() {
- return rocketMQNamespace;
+ public String getNamespace() {
+ return namespace;
}
- public void setRocketMQNamespace(String rocketMQNamespace) {
- this.rocketMQNamespace = rocketMQNamespace;
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
}
public String getWorkAgentResponseTopic() {
diff --git
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportProvider.java
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportProvider.java
index 2b98291..ddf6ca5 100644
---
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportProvider.java
+++
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportProvider.java
@@ -30,7 +30,7 @@ public class RocketMQTransportProvider implements
ClientTransportProvider<Rocket
if (clientTransportConfig == null) {
clientTransportConfig = new RocketMQTransportConfig(new
JdkA2AHttpClient());
}
- return new
RocketMQTransport(clientTransportConfig.getRocketMQNamespace(),
clientTransportConfig.getAccessKey(), clientTransportConfig.getSecretKey(),
clientTransportConfig.getWorkAgentResponseTopic(),
clientTransportConfig.getWorkAgentResponseGroupID(),
clientTransportConfig.getInterceptors(), clientTransportConfig.getAgentUrl(),
clientTransportConfig.getHttpClient(), clientTransportConfig.getLiteTopic(),
clientTransportConfig.isUseDefaultRecoverMode(), agentCard);
+ return new RocketMQTransport(clientTransportConfig.getNamespace(),
clientTransportConfig.getAccessKey(), clientTransportConfig.getSecretKey(),
clientTransportConfig.getWorkAgentResponseTopic(),
clientTransportConfig.getWorkAgentResponseGroupID(),
clientTransportConfig.getInterceptors(), clientTransportConfig.getAgentUrl(),
clientTransportConfig.getHttpClient(), clientTransportConfig.getLiteTopic(),
clientTransportConfig.isUseDefaultRecoverMode(), agentCard);
}
@Override