This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-a2a.git
The following commit(s) were added to refs/heads/main by this push:
new 4dc8133 Optimize the code (#6)
4dc8133 is described below
commit 4dc8133b1ab3b64fa043976ebe2902b8bb55a5d1
Author: Drizzle <[email protected]>
AuthorDate: Fri Dec 12 10:11:09 2025 +0800
Optimize the code (#6)
* update
Change-Id: I2b5f6364699c1c9e07b6ad381b801c49ce641559
* optimize the code
Change-Id: If2495360c76970ac88387e59e3ac15a6237e2f3e
* update
Change-Id: Iceef7a1bd1d05bfa498694ba7c3a007bbfdf378e
* update
Change-Id: Id0b75d941f02387d7b05a96731ea936526bae012
* add spring web demo
Change-Id: I74483972366bd35fee4a8e6c58b928204b3669ed
* update example
Change-Id: I00ec96acdbf1b7b5aabad5d28b10667b605635e1
* update
Change-Id: I0f51bf41c079d782a1a3adbbd388a32a50170980
* update
Change-Id: Id0e84eb0f4aa59d7aa020b36a7de85a8583f142d
* update demo
Change-Id: Ic7af56d1b946b119816848725465b60335bfc9ee
* optimzie the code
Change-Id: Ie45c5bbb6d27af2dd2bd2c66e5b0ee18faa91a41
---------
Co-authored-by: drizzle.zk <[email protected]>
---
README.md | 2 +-
example/rocketmq-multiagent-base-adk/README.md | 8 +-
.../SupervisorAgent-Web/pom.xml | 4 +-
.../java/org/example/service/AgentService.java | 19 +--
.../src/main/resources/static/index.html | 1 -
.../agent/SupervisorAgentA2ASDKMainStream.java | 17 +--
.../src/main/java/agent/AgentCardProducer.java | 6 +-
.../src/main/java/agent/AgentCardProducer.java | 6 +-
example/rocketmq-multiagent-base-adk/pom.xml | 2 +-
pom.xml | 2 +-
.../a2a/server/RocketMQA2AServerRoutes.java | 18 +--
.../rocketmq/a2a/transport/RocketMQTransport.java | 156 ++++++++++-----------
.../a2a/transport/RocketMQTransportConfig.java | 22 +--
.../a2a/transport/RocketMQTransportProvider.java | 2 +-
14 files changed, 114 insertions(+), 151 deletions(-)
diff --git a/README.md b/README.md
index 55d8193..ce1dcfe 100644
--- a/README.md
+++ b/README.md
@@ -52,7 +52,7 @@ Create an A2A Client Using RocketMQTransport and
RocketMQTransportConfig
rocketMQTransportConfig.setSecretKey(secretKey);
rocketMQTransportConfig.setWorkAgentResponseGroupID(WorkAgentResponseGroupID);
rocketMQTransportConfig.setWorkAgentResponseTopic(WorkAgentResponseTopic);
- rocketMQTransportConfig.setRocketMQInstanceID(RocketMQInstanceId);
+ rocketMQTransportConfig.setRocketMQNamespace(RocketMQNamespace);
Client client=Client.builder(finalAgentCard)
.addConsumers(consumers)
.streamingErrorHandler(streamingErrorHandler)
diff --git a/example/rocketmq-multiagent-base-adk/README.md
b/example/rocketmq-multiagent-base-adk/README.md
index 218f7c0..f0383a6 100644
--- a/example/rocketmq-multiagent-base-adk/README.md
+++ b/example/rocketmq-multiagent-base-adk/README.md
@@ -44,7 +44,7 @@ cd WeatherAgent
```
```shell
-MAVEN_OPTS="-DrocketMQEndpoint= -DrocketMQInstanceID=
-DbizTopic=WeatherAgentTask -DbizConsumerGroup=WeatherAgentTaskConsumerGroup
-DrocketMQAk= -DrocketMQSk= -DapiKey= -DappId= " mvn quarkus:dev
+MAVEN_OPTS="-DrocketMQEndpoint= -DrocketMQNamespace=
-DbizTopic=WeatherAgentTask -DbizConsumerGroup=WeatherAgentTaskConsumerGroup
-DrocketMQAk= -DrocketMQSk= -DapiKey= -DappId= " mvn quarkus:dev
```

@@ -54,7 +54,7 @@ cd TravelAgent
```
```shell
- MAVEN_OPTS="-DrocketMQEndpoint= -DrocketMQInstanceID=
-DbizTopic=TravelAgentTask -DbizConsumerGroup=TravelAgentTaskConsumerGroup
-DrocketMQAk= -DrocketMQSk= -DapiKey= -DappId= " mvn quarkus:dev
+ MAVEN_OPTS="-DrocketMQEndpoint= -DrocketMQNamespace=
-DbizTopic=TravelAgentTask -DbizConsumerGroup=TravelAgentTaskConsumerGroup
-DrocketMQAk= -DrocketMQSk= -DapiKey= -DappId= " mvn quarkus:dev
```

@@ -63,7 +63,7 @@ cd TravelAgent
cd SupervisorAgent/target
```
```shell
-java -DrocketMQInstanceID= -DworkAgentResponseTopic=WorkerAgentResponse
-DworkAgentResponseGroupID=CID_HOST_AGENT_LITE -DapiKey=
-DweatherAgentTaskTopic=WeatherAgentTask -DtravelAgentTaskTopic=TravelAgentTask
-DrocketMQAK= -DrocketMQSK= -jar
SupervisorAgent-2.1.1-SNAPSHOT-jar-with-dependencies.jar
+java -DrocketMQNamespace= -DworkAgentResponseTopic=WorkerAgentResponse
-DworkAgentResponseGroupID=CID_HOST_AGENT_LITE -DapiKey=
-DweatherAgentTaskTopic=WeatherAgentTask -DtravelAgentTaskTopic=TravelAgentTask
-DrocketMQAK= -DrocketMQSK= -jar
SupervisorAgent-2.1.1-SNAPSHOT-jar-with-dependencies.jar
```

@@ -74,7 +74,7 @@ cd SupervisorAgent-Web/target
```
```shell
-java -DrocketMQInstanceID= -DworkAgentResponseTopic=WorkerAgentResponse
-DworkAgentResponseGroupID=CID_HOST_AGENT_LITE -DapiKey=
-DweatherAgentTaskTopic=WeatherAgentTask -DtravelAgentTaskTopic=TravelAgentTask
-DrocketMQAK= -DrocketMQSK= -jar SupervisorAgent-Web-1.0.3-SNAPSHOT.jar
+java -DrocketMQNamespace= -DworkAgentResponseTopic=WorkerAgentResponse
-DworkAgentResponseGroupID=CID_HOST_AGENT_LITE -DapiKey=
-DweatherAgentTaskTopic=WeatherAgentTask -DtravelAgentTaskTopic=TravelAgentTask
-DrocketMQAK= -DrocketMQSK= -jar SupervisorAgent-Web-2.1.1-SNAPSHOT.jar
```
- 打开浏览器,访问 localhost:9090
- 下面的示例展示了以RocketMQ作为底层Transport过程中实现异步通信以及断点重传功能
diff --git a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
index 18556fc..284f0f5 100644
--- a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
+++ b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
@@ -27,7 +27,7 @@
</parent>
<artifactId>SupervisorAgent-Web</artifactId>
- <version>1.0.3-SNAPSHOT</version>
+ <version>2.1.1-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
@@ -119,7 +119,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-a2a</artifactId>
- <version>1.0.5</version>
+ <version>1.0.6</version>
</dependency>
</dependencies>
diff --git
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
index 090c648..ad19009 100644
---
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
+++
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
@@ -77,7 +77,7 @@ public class AgentService {
private static final String TRAVEL_AGENT_URL = "http://localhost:8888";
private static final String WORK_AGENT_RESPONSE_TOPIC =
System.getProperty("workAgentResponseTopic");
private static final String WORK_AGENT_RESPONSE_GROUP_ID =
System.getProperty("workAgentResponseGroupID");
- private static final String ROCKETMQ_INSTANCE_ID =
System.getProperty("rocketMQInstanceID");
+ private static final String ROCKETMQ_NAMESPACE =
System.getProperty("rocketMQNamespace");
private static final String ACCESS_KEY = System.getProperty("rocketMQAK");
private static final String SECRET_KEY = System.getProperty("rocketMQSK");
private static final String API_KEY = System.getProperty("apiKey");
@@ -106,22 +106,13 @@ public class AgentService {
}
private static boolean checkConfigParam() {
- if (StringUtils.isEmpty(ROCKETMQ_INSTANCE_ID) ||
StringUtils.isEmpty(WORK_AGENT_RESPONSE_TOPIC) ||
StringUtils.isEmpty(WORK_AGENT_RESPONSE_GROUP_ID) ||
StringUtils.isEmpty(ACCESS_KEY) || StringUtils.isEmpty(SECRET_KEY) ||
StringUtils.isEmpty(API_KEY)) {
- if (StringUtils.isEmpty(ROCKETMQ_INSTANCE_ID)) {
- log.error("请配置RocketMQ 的实例信息 rocketMQInstanceID");
- }
+ if (StringUtils.isEmpty(WORK_AGENT_RESPONSE_TOPIC) ||
StringUtils.isEmpty(WORK_AGENT_RESPONSE_GROUP_ID) ||
StringUtils.isEmpty(API_KEY)) {
if (StringUtils.isEmpty(WORK_AGENT_RESPONSE_TOPIC)) {
log.error("请配置RocketMQ 的轻量消息Topic workAgentResponseTopic");
}
if (StringUtils.isEmpty(WORK_AGENT_RESPONSE_GROUP_ID)) {
log.error("请配置RocketMQ 的轻量消息消费者 workAgentResponseGroupID");
}
- if (StringUtils.isEmpty(ACCESS_KEY)) {
- log.error("请配置RocketMQ 的访问控制-用户名 rocketMQAK");
- }
- if (StringUtils.isEmpty(SECRET_KEY)) {
- log.error("请配置RocketMQ 的访问控制-密码 rocketMQSK");
- }
if (StringUtils.isEmpty(API_KEY)) {
log.error("请配置SupervisorAgent qwen-plus apiKey");
}
@@ -279,8 +270,8 @@ public class AgentService {
}
private void initAgentCardInfo(String accessKey, String secretKey, String
agentName, String agentUrl) {
- if (StringUtils.isEmpty(accessKey) || StringUtils.isEmpty(secretKey)
|| StringUtils.isEmpty(agentName) || StringUtils.isEmpty(agentUrl)) {
- log.error("initAgentCardInfo param error, accessKey: {},
secretKey: {}, agentName: {}, agentUrl: {}", accessKey, secretKey, agentName,
agentUrl);
+ if (StringUtils.isEmpty(agentName) || StringUtils.isEmpty(agentUrl)) {
+ log.error("initAgentCardInfo param error, agentName: {}, agentUrl:
{}", agentName, agentUrl);
return;
}
AgentCard finalAgentCard = new
A2ACardResolver(agentUrl).getAgentCard();
@@ -322,7 +313,7 @@ public class AgentService {
};
//config rocketmq info
RocketMQTransportConfig rocketMQTransportConfig = new
RocketMQTransportConfig();
- rocketMQTransportConfig.setRocketMQInstanceID(ROCKETMQ_INSTANCE_ID);
+ rocketMQTransportConfig.setRocketMQNamespace(ROCKETMQ_NAMESPACE);
rocketMQTransportConfig.setAccessKey(accessKey);
rocketMQTransportConfig.setSecretKey(secretKey);
rocketMQTransportConfig.setWorkAgentResponseGroupID(WORK_AGENT_RESPONSE_GROUP_ID);
diff --git
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/resources/static/index.html
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/resources/static/index.html
index cb4f9a4..ee6528a 100644
---
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/resources/static/index.html
+++
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/resources/static/index.html
@@ -909,7 +909,6 @@
this.userIdInput.disabled = true;
this.sessionIdInput.disabled = true;
this.confirmButton.disabled = true;
- // 发起 resubscribeMessage 请求
showToast("登录成功")
}
diff --git
a/example/rocketmq-multiagent-base-adk/SupervisorAgent/src/main/java/agent/SupervisorAgentA2ASDKMainStream.java
b/example/rocketmq-multiagent-base-adk/SupervisorAgent/src/main/java/agent/SupervisorAgentA2ASDKMainStream.java
index 1981e64..199a1cd 100644
---
a/example/rocketmq-multiagent-base-adk/SupervisorAgent/src/main/java/agent/SupervisorAgentA2ASDKMainStream.java
+++
b/example/rocketmq-multiagent-base-adk/SupervisorAgent/src/main/java/agent/SupervisorAgentA2ASDKMainStream.java
@@ -70,7 +70,7 @@ public class SupervisorAgentA2ASDKMainStream {
private static final String TRAVEL_AGENT_URL = "http://localhost:8888";
private static final String WORK_AGENT_RESPONSE_TOPIC =
System.getProperty("workAgentResponseTopic");
private static final String WORK_AGENT_RESPONSE_GROUP_ID =
System.getProperty("workAgentResponseGroupID");
- private static final String ROCKETMQ_INSTANCE_ID =
System.getProperty("rocketMQInstanceID");
+ private static final String ROCKETMQ_NAMESPACE =
System.getProperty("rocketMQNamespace");
private static final String ACCESS_KEY = System.getProperty("rocketMQAK");
private static final String SECRET_KEY = System.getProperty("rocketMQSK");
private static final String API_KEY = System.getProperty("apiKey");
@@ -132,22 +132,13 @@ public class SupervisorAgentA2ASDKMainStream {
}
private static boolean checkConfigParam() {
- if (StringUtils.isEmpty(ROCKETMQ_INSTANCE_ID) ||
StringUtils.isEmpty(WORK_AGENT_RESPONSE_TOPIC) ||
StringUtils.isEmpty(WORK_AGENT_RESPONSE_GROUP_ID) ||
StringUtils.isEmpty(ACCESS_KEY) || StringUtils.isEmpty(SECRET_KEY) ||
StringUtils.isEmpty(API_KEY)) {
- if (StringUtils.isEmpty(ROCKETMQ_INSTANCE_ID)) {
- System.out.println("请配置RocketMQ 的实例信息 rocketMQInstanceID");
- }
+ if (StringUtils.isEmpty(WORK_AGENT_RESPONSE_TOPIC) ||
StringUtils.isEmpty(WORK_AGENT_RESPONSE_GROUP_ID) ||
StringUtils.isEmpty(API_KEY)) {
if (StringUtils.isEmpty(WORK_AGENT_RESPONSE_TOPIC)) {
System.out.println("请配置RocketMQ 的轻量消息Topic
workAgentResponseTopic");
}
if (StringUtils.isEmpty(WORK_AGENT_RESPONSE_GROUP_ID)) {
System.out.println("请配置RocketMQ 的轻量消息消费者
workAgentResponseGroupID");
}
- if (StringUtils.isEmpty(ACCESS_KEY)) {
- System.out.println("请配置RocketMQ 的访问控制-用户名 rocketMQAK");
- }
- if (StringUtils.isEmpty(SECRET_KEY)) {
- System.out.println("请配置RocketMQ 的访问控制-密码 rocketMQSK");
- }
if (StringUtils.isEmpty(API_KEY)) {
System.out.println("请配置SupervisorAgent qwen-plus apiKey");
}
@@ -237,7 +228,7 @@ public class SupervisorAgentA2ASDKMainStream {
}
private static void initAgentCardInfo(String accessKey, String secretKey,
String agentName, String agentUrl) {
- if (StringUtils.isEmpty(accessKey) || StringUtils.isEmpty(secretKey)
|| StringUtils.isEmpty(agentName) || StringUtils.isEmpty(agentUrl)) {
+ if (StringUtils.isEmpty(agentName) || StringUtils.isEmpty(agentUrl)) {
System.out.println("initAgentCardInfo param error");
return;
}
@@ -272,7 +263,7 @@ public class SupervisorAgentA2ASDKMainStream {
System.err.println("Streaming error occurred: " +
error.getMessage());
};
RocketMQTransportConfig rocketMQTransportConfig = new
RocketMQTransportConfig();
- rocketMQTransportConfig.setRocketMQInstanceID(ROCKETMQ_INSTANCE_ID);
+ rocketMQTransportConfig.setRocketMQNamespace(ROCKETMQ_NAMESPACE);
rocketMQTransportConfig.setAccessKey(accessKey);
rocketMQTransportConfig.setSecretKey(secretKey);
rocketMQTransportConfig.setWorkAgentResponseGroupID(WORK_AGENT_RESPONSE_GROUP_ID);
diff --git
a/example/rocketmq-multiagent-base-adk/TravelAgent/src/main/java/agent/AgentCardProducer.java
b/example/rocketmq-multiagent-base-adk/TravelAgent/src/main/java/agent/AgentCardProducer.java
index cdfb6b9..f8e4204 100644
---
a/example/rocketmq-multiagent-base-adk/TravelAgent/src/main/java/agent/AgentCardProducer.java
+++
b/example/rocketmq-multiagent-base-adk/TravelAgent/src/main/java/agent/AgentCardProducer.java
@@ -31,7 +31,7 @@ import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.ROCKETMQ_PROTOC
@ApplicationScoped
public class AgentCardProducer {
private static final String ROCKETMQ_ENDPOINT =
System.getProperty("rocketMQEndpoint", "");
- private static final String ROCKETMQ_INSTANCE_ID =
System.getProperty("rocketMQInstanceID", "");
+ private static final String ROCKETMQ_NAMESPACE =
System.getProperty("rocketMQNamespace", "");
private static final String BIZ_TOPIC = System.getProperty("bizTopic", "");
@Produces
@@ -64,10 +64,10 @@ public class AgentCardProducer {
}
private static String buildRocketMQUrl() {
- if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) ||
StringUtils.isEmpty(ROCKETMQ_INSTANCE_ID) || StringUtils.isEmpty(BIZ_TOPIC)) {
+ if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) ||
StringUtils.isEmpty(BIZ_TOPIC)) {
throw new RuntimeException("buildRocketMQUrl param error, please
check rocketmq config");
}
- return "http://" + ROCKETMQ_ENDPOINT + "/" + ROCKETMQ_INSTANCE_ID +
"/" + BIZ_TOPIC;
+ return "http://" + ROCKETMQ_ENDPOINT + "/" + ROCKETMQ_NAMESPACE + "/"
+ BIZ_TOPIC;
}
}
diff --git
a/example/rocketmq-multiagent-base-adk/WeatherAgent/src/main/java/agent/AgentCardProducer.java
b/example/rocketmq-multiagent-base-adk/WeatherAgent/src/main/java/agent/AgentCardProducer.java
index 0274756..530f0ab 100644
---
a/example/rocketmq-multiagent-base-adk/WeatherAgent/src/main/java/agent/AgentCardProducer.java
+++
b/example/rocketmq-multiagent-base-adk/WeatherAgent/src/main/java/agent/AgentCardProducer.java
@@ -31,7 +31,7 @@ import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.ROCKETMQ_PROTOC
@ApplicationScoped
public class AgentCardProducer {
private static final String ROCKETMQ_ENDPOINT =
System.getProperty("rocketMQEndpoint", "");
- private static final String ROCKETMQ_INSTANCE_ID =
System.getProperty("rocketMQInstanceID", "");
+ private static final String ROCKETMQ_NAMESPACE =
System.getProperty("rocketMQNamespace", "");
private static final String BIZ_TOPIC = System.getProperty("bizTopic", "");
@Produces
@@ -64,10 +64,10 @@ public class AgentCardProducer {
}
private static String buildRocketMQUrl() {
- if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) ||
StringUtils.isEmpty(ROCKETMQ_INSTANCE_ID) || StringUtils.isEmpty(BIZ_TOPIC)) {
+ if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) ||
StringUtils.isEmpty(BIZ_TOPIC)) {
throw new RuntimeException("buildRocketMQUrl param error, please
check rocketmq config");
}
- return "http://" + ROCKETMQ_ENDPOINT + "/" + ROCKETMQ_INSTANCE_ID +
"/" + BIZ_TOPIC;
+ return "http://" + ROCKETMQ_ENDPOINT + "/" + ROCKETMQ_NAMESPACE + "/"
+ BIZ_TOPIC;
}
}
diff --git a/example/rocketmq-multiagent-base-adk/pom.xml
b/example/rocketmq-multiagent-base-adk/pom.xml
index cb9ef63..4860805 100644
--- a/example/rocketmq-multiagent-base-adk/pom.xml
+++ b/example/rocketmq-multiagent-base-adk/pom.xml
@@ -81,7 +81,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-a2a</artifactId>
- <version>1.0.5</version>
+ <version>1.0.6</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
diff --git a/pom.xml b/pom.xml
index 9150a95..92b01dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-a2a</artifactId>
- <version>1.0.6-SNAPSHOT</version>
+ <version>1.0.6</version>
<name>Apache RocketMQ A2A ${project.version}</name>
<description>Integrate Apache RocketMQ with A2A</description>
diff --git
a/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
b/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
index 8595f3e..8bed7c8 100644
--- a/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
+++ b/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
@@ -94,7 +94,7 @@ import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.METHOD;
public class RocketMQA2AServerRoutes extends A2AServerRoutes {
private static final Logger log =
LoggerFactory.getLogger(RocketMQA2AServerRoutes.class);
private static final String ROCKETMQ_ENDPOINT =
System.getProperty("rocketMQEndpoint", "");
- private static final String ROCKETMQ_INSTANCE_ID =
System.getProperty("rocketMQInstanceID", "");
+ private static final String ROCKETMQ_NAMESPACE =
System.getProperty("rocketMQNamespace", "");
private static final String BIZ_TOPIC = System.getProperty("bizTopic", "");
private static final String BIZ_CONSUMER_GROUP =
System.getProperty("bizConsumerGroup", "");
private static final String ACCESS_KEY = System.getProperty("rocketMQAk",
"");
@@ -134,7 +134,7 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
.setEndpoints(ROCKETMQ_ENDPOINT)
- .setNamespace(ROCKETMQ_INSTANCE_ID)
+ .setNamespace(ROCKETMQ_NAMESPACE)
.setCredentialProvider(sessionCredentialsProvider)
.setRequestTimeout(Duration.ofSeconds(15))
.build();
@@ -147,7 +147,7 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
.setEndpoints(ROCKETMQ_ENDPOINT)
- .setNamespace(ROCKETMQ_INSTANCE_ID)
+ .setNamespace(ROCKETMQ_NAMESPACE)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String tag = "*";
@@ -409,26 +409,16 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
}
private void checkConfigParam() {
- if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) ||
StringUtils.isEmpty(ROCKETMQ_INSTANCE_ID) || StringUtils.isEmpty(BIZ_TOPIC) ||
- StringUtils.isEmpty(BIZ_CONSUMER_GROUP) ||
StringUtils.isEmpty(ACCESS_KEY) || StringUtils.isEmpty(SECRET_KEY)) {
+ if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) ||
StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) {
if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT)) {
log.info("rocketMQEndpoint is empty");
}
- if (StringUtils.isEmpty(ROCKETMQ_INSTANCE_ID)) {
- log.info("rocketMQInstanceID is empty");
- }
if (StringUtils.isEmpty(BIZ_TOPIC)) {
log.info("bizTopic is empty");
}
if (StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) {
log.info("bizConsumerGroup is empty");
}
- if (StringUtils.isEmpty(ACCESS_KEY)) {
- log.info("rocketMQAK is empty");
- }
- if (StringUtils.isEmpty(SECRET_KEY)) {
- log.info("rocketMQSK is empty");
- }
throw new RuntimeException("RocketMQA2AServerRoutes check init
rocketmq param error, init failed!!!");
}
}
diff --git
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
index 027d63c..821ba11 100644
--- a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
+++ b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
@@ -107,18 +107,18 @@ public class RocketMQTransport implements ClientTransport
{
private static final TypeReference<ListTaskPushNotificationConfigResponse>
LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {
};
private static final
TypeReference<DeleteTaskPushNotificationConfigResponse>
DELETE_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>()
{ };
private static final TypeReference<GetAuthenticatedExtendedCardResponse>
GET_AUTHENTICATED_EXTENDED_CARD_RESPONSE_REFERENCE = new TypeReference<>() { };
- private static final ConcurrentMap<String /* instance */, Map<String /*
WorkerAgentResponseTopic */, LitePushConsumer>> ROCKETMQ_CONSUMER_MAP = new
ConcurrentHashMap<>();
- private static final ConcurrentMap<String /* instance */, Map<String /*
agentTopic */, Producer>> ROCKETMQ_PRODUCER_MAP = new ConcurrentHashMap<>();
- private static final ConcurrentMap<String /* instance */, Map<String /*
msgId */, CompletableFuture<String>>> MESSAGE_RESPONSE_MAP = new
ConcurrentHashMap<>();
- private static final ConcurrentMap<String /* instance */, Map<String /*
msgId */, SSEEventListener>> MESSAGE_STREAM_RESPONSE_MAP = new
ConcurrentHashMap<>();
- private static final ConcurrentMap<String /* instance */, Map<String /*
liteTopic */, Boolean>> LITE_TOPIC_USE_DEFAULT_RECOVER_MAP = new
ConcurrentHashMap<>();
- private static final ConcurrentMap<String /* instance */, Map<String /*
Key */, SSEEventListener>> RECOVER_MESSAGE_STREAM_RESPONSE_MAP = new
ConcurrentHashMap<>();
+ private static final ConcurrentMap<String /* namespace */, Map<String /*
WorkerAgentResponseTopic */, LitePushConsumer>> ROCKETMQ_CONSUMER_MAP = new
ConcurrentHashMap<>();
+ private static final ConcurrentMap<String /* namespace */, Map<String /*
agentTopic */, Producer>> ROCKETMQ_PRODUCER_MAP = new ConcurrentHashMap<>();
+ private static final ConcurrentMap<String /* namespace */, Map<String /*
msgId */, CompletableFuture<String>>> MESSAGE_RESPONSE_MAP = new
ConcurrentHashMap<>();
+ private static final ConcurrentMap<String /* namespace */, Map<String /*
msgId */, SSEEventListener>> MESSAGE_STREAM_RESPONSE_MAP = new
ConcurrentHashMap<>();
+ private static final ConcurrentMap<String /* namespace */, Map<String /*
liteTopic */, Boolean>> LITE_TOPIC_USE_DEFAULT_RECOVER_MAP = new
ConcurrentHashMap<>();
+ private static final ConcurrentMap<String /* namespace */, Map<String /*
Key */, SSEEventListener>> RECOVER_MESSAGE_STREAM_RESPONSE_MAP = new
ConcurrentHashMap<>();
private final String agentTopic;
private final String accessKey;
private final String secretKey;
private final String rocketMQEndpoint;
- private final String rocketMQInstanceID;
+ private final String rocketNamespace;
private final String workAgentResponseTopic;
private final String workAgentResponseGroupID;
private final List<ClientCallInterceptor> interceptors;
@@ -131,7 +131,7 @@ public class RocketMQTransport implements ClientTransport {
private LitePushConsumer litePushConsumer;
private Producer producer;
- public RocketMQTransport(String rocketMQInstanceID, String accessKey,
String secretKey, String workAgentResponseTopic, String
workAgentResponseGroupID,
+ public RocketMQTransport(String rocketNamespace, String accessKey, String
secretKey, String workAgentResponseTopic, String workAgentResponseGroupID,
List<ClientCallInterceptor> interceptors, String agentUrl,
A2AHttpClient httpClient, String liteTopic, boolean useDefaultRecoverMode,
AgentCard agentCard) {
this.accessKey = accessKey;
this.secretKey = secretKey;
@@ -146,27 +146,27 @@ public class RocketMQTransport implements ClientTransport
{
}
this.useDefaultRecoverMode = useDefaultRecoverMode;
this.agentCard = agentCard;
- RocketMQInstanceInfo rocketAgentCardInfo =
parseAgentCardAddition(this.agentCard);
+ RocketMQResourceInfo rocketAgentCardInfo =
parseAgentCardAddition(this.agentCard);
if (null == rocketAgentCardInfo) {
throw new RuntimeException("RocketMQTransport rocketAgentCardInfo
pare error");
}
- if (!rocketMQInstanceID.equals(rocketAgentCardInfo.getInstance())) {
- throw new RuntimeException("RocketMQTransport rocketAgentCardInfo
instance do not match, please check the config info");
+ if (null != rocketNamespace &&
!rocketNamespace.equals(rocketAgentCardInfo.getNamespace())) {
+ throw new RuntimeException("RocketMQTransport rocketAgentCardInfo
namespace do not match, please check the config info");
}
this.rocketMQEndpoint = rocketAgentCardInfo.getEndpoint();
this.agentTopic = rocketAgentCardInfo.getTopic();
- this.rocketMQInstanceID = rocketAgentCardInfo.getInstance();
-
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.rocketMQInstanceID, k
-> new HashMap<>()).put(this.liteTopic, useDefaultRecoverMode);
+ this.rocketNamespace =
StringUtils.isEmpty(rocketAgentCardInfo.getNamespace()) ? "" :
rocketAgentCardInfo.getNamespace();
+
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.rocketNamespace, k ->
new HashMap<>()).put(this.liteTopic, useDefaultRecoverMode);
checkConfigParam();
initRocketMQProducerAndConsumer();
}
private void initRocketMQProducerAndConsumer() {
- if (StringUtils.isEmpty(this.rocketMQEndpoint) ||
StringUtils.isEmpty(this.rocketMQInstanceID) ||
StringUtils.isEmpty(this.workAgentResponseTopic) ||
StringUtils.isEmpty(this.liteTopic)) {
+ if (StringUtils.isEmpty(this.rocketMQEndpoint) ||
StringUtils.isEmpty(this.workAgentResponseTopic) ||
StringUtils.isEmpty(this.liteTopic)) {
throw new A2AClientException("RocketMQTransport
initRocketMQProducerAndConsumer param error");
}
try {
- Map<String, LitePushConsumer> consumerMap =
ROCKETMQ_CONSUMER_MAP.computeIfAbsent(this.rocketMQInstanceID, k -> new
HashMap<>());
+ Map<String, LitePushConsumer> consumerMap =
ROCKETMQ_CONSUMER_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
if (consumerMap.containsKey(this.workAgentResponseTopic)) {
this.litePushConsumer =
consumerMap.get(this.workAgentResponseTopic);
this.litePushConsumer.subscribeLite(this.liteTopic);
@@ -184,7 +184,7 @@ public class RocketMQTransport implements ClientTransport {
this.litePushConsumer = litePushConsumer;
}
}
- Map<String, Producer> producerMap =
ROCKETMQ_PRODUCER_MAP.computeIfAbsent(this.rocketMQInstanceID, k -> new
HashMap<>());
+ Map<String, Producer> producerMap =
ROCKETMQ_PRODUCER_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
if (!producerMap.containsKey(this.agentTopic)) {
this.producer = buildProducer(this.agentTopic);
producerMap.put(this.agentTopic, this.producer);
@@ -210,7 +210,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport sendMessage error,
responseMessageId is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketMQInstanceID, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -241,7 +241,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport sendMessageStreaming error,
responseMessageId is null");
return;
}
-
MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(this.rocketMQInstanceID, k -> new
HashMap<>()).put(responseMessageId, sseEventListener);
+ MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace,
k -> new HashMap<>()).put(responseMessageId, sseEventListener);
log.info("RocketMQTransport sendMessageStreaming success,
responseMessageId: {}", responseMessageId);
} catch (Exception e) {
throw new A2AClientException("RocketMQTransport Failed to send
streaming message request: " + e, e);
@@ -258,24 +258,24 @@ public class RocketMQTransport implements ClientTransport
{
if (null != request.metadata()) {
String responseMessageId =
(String)request.metadata().get(RocketMQA2AConstant.MESSAGE_RESPONSE_ID);
if (!StringUtils.isEmpty(responseMessageId)) {
-
MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(this.rocketMQInstanceID, k -> new
HashMap<>()).put(responseMessageId, sseEventListener);
+
MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>()).put(responseMessageId, sseEventListener);
}
String liteTopic =
(String)request.metadata().get(RocketMQA2AConstant.LITE_TOPIC);
if (null != litePushConsumer &&
!StringUtils.isEmpty(liteTopic)) {
litePushConsumer.subscribeLite(liteTopic);
log.info("litePushConsumer subscribeLite liteTopic: {}",
liteTopic);
-
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.rocketMQInstanceID, k
-> new HashMap<>()).put(liteTopic, this.useDefaultRecoverMode);
+
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.rocketNamespace, k ->
new HashMap<>()).put(liteTopic, this.useDefaultRecoverMode);
}
String closeLiteTopic =
(String)request.metadata().get(RocketMQA2AConstant.CLOSE_LITE_TOPIC);
if (null != litePushConsumer &&
!StringUtils.isEmpty(closeLiteTopic)) {
litePushConsumer.unsubscribeLite(closeLiteTopic);
log.info("litePushConsumer unsubscribeLite liteTopic: {}",
closeLiteTopic);
-
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.rocketMQInstanceID, k
-> new HashMap<>()).remove(closeLiteTopic);
+
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.rocketNamespace, k ->
new HashMap<>()).remove(closeLiteTopic);
}
}
if (this.useDefaultRecoverMode) {
-
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(rocketMQInstanceID, k ->
new HashMap<>()).put(RocketMQA2AConstant.DEFAULT_STREAM_RECOVER,
sseEventListener);
+
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(rocketNamespace, k -> new
HashMap<>()).put(RocketMQA2AConstant.DEFAULT_STREAM_RECOVER, sseEventListener);
}
} catch (Exception e) {
throw new A2AClientException("RocketMQTransport failed to
resubscribe streaming message request: " + e, e);
@@ -296,7 +296,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport getTask error, responseMessageId
is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketMQInstanceID, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -324,7 +324,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport cancelTask error,
responseMessageId is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketMQInstanceID, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -353,7 +353,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport
setTaskPushNotificationConfiguration error, responseMessageId is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketMQInstanceID, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -383,7 +383,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport
getTaskPushNotificationConfiguration error, responseMessageId is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketMQInstanceID, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
CompletableFuture<String> completableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId, completableFuture);
String result = completableFuture.get(120, TimeUnit.SECONDS);
@@ -411,7 +411,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport
listTaskPushNotificationConfigurations error, responseMessageId is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketMQInstanceID, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -439,7 +439,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport
deleteTaskPushNotificationConfigurations error, responseMessageId is null");
return;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketMQInstanceID, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -472,7 +472,7 @@ public class RocketMQTransport implements ClientTransport {
log.error("RocketMQTransport getAgentCard
responseMessageId is null");
return null;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketMQInstanceID, k -> new
HashMap<>());
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new
HashMap<>());
CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
completableFutureMap.put(responseMessageId,
objectCompletableFuture);
String result = objectCompletableFuture.get(120,
TimeUnit.SECONDS);
@@ -503,12 +503,9 @@ public class RocketMQTransport implements ClientTransport {
}
private void checkConfigParam() {
- if (StringUtils.isEmpty(this.rocketMQInstanceID) ||
StringUtils.isEmpty(this.rocketMQEndpoint) ||
StringUtils.isEmpty(this.workAgentResponseTopic) ||
- StringUtils.isEmpty(this.workAgentResponseGroupID) ||
StringUtils.isEmpty(this.liteTopic) || StringUtils.isEmpty(agentTopic) ||
StringUtils.isEmpty(accessKey) || StringUtils.isEmpty(secretKey)) {
+ if (StringUtils.isEmpty(this.rocketMQEndpoint) ||
StringUtils.isEmpty(this.workAgentResponseTopic) ||
+ StringUtils.isEmpty(this.workAgentResponseGroupID) ||
StringUtils.isEmpty(this.liteTopic) || StringUtils.isEmpty(agentTopic)) {
- if (StringUtils.isEmpty(this.rocketMQInstanceID)) {
- log.info("RocketMQTransport checkConfigParam
rocketMQInstanceID is empty");
- }
if (StringUtils.isEmpty(this.rocketMQEndpoint)) {
log.info("RocketMQTransport checkConfigParam rocketMQEndpoint
is empty");
}
@@ -524,12 +521,6 @@ public class RocketMQTransport implements ClientTransport {
if (StringUtils.isEmpty(this.agentTopic)) {
log.info("RocketMQTransport checkConfigParam agentTopic is
empty");
}
- if (StringUtils.isEmpty(this.accessKey)) {
- log.info("RocketMQTransport checkConfigParam accessKey is
empty");
- }
- if (StringUtils.isEmpty(this.secretKey)) {
- log.info("RocketMQTransport checkConfigParam secretKey is
empty");
- }
throw new RuntimeException("RocketMQTransport checkConfigParam
error, init failed !!!");
}
}
@@ -548,7 +539,7 @@ public class RocketMQTransport implements ClientTransport {
}
private LitePushConsumer buildConsumer() throws ClientException {
- if (StringUtils.isEmpty(this.rocketMQEndpoint) ||
StringUtils.isEmpty(this.accessKey) ||
StringUtils.isEmpty(this.workAgentResponseGroupID) ||
StringUtils.isEmpty(this.workAgentResponseTopic)) {
+ if (StringUtils.isEmpty(this.rocketMQEndpoint) ||
StringUtils.isEmpty(this.workAgentResponseGroupID) ||
StringUtils.isEmpty(this.workAgentResponseTopic)) {
log.error("RocketMQTransport buildConsumer check param error");
return null;
}
@@ -556,7 +547,7 @@ public class RocketMQTransport implements ClientTransport {
SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
.setEndpoints(this.rocketMQEndpoint)
- .setNamespace(this.rocketMQInstanceID)
+ .setNamespace(this.rocketNamespace)
.setCredentialProvider(sessionCredentialsProvider)
.build();
LitePushConsumer litePushConsumer =
provider.newLitePushConsumerBuilder()
@@ -580,9 +571,9 @@ public class RocketMQTransport implements ClientTransport {
return ConsumeResult.SUCCESS;
}
if (!response.isStream()) {
- return dealNonStreamResult(response,
this.rocketMQInstanceID);
+ return dealNonStreamResult(response,
this.rocketNamespace);
}
- return dealStreamResult(response, this.rocketMQInstanceID,
liteTopic);
+ return dealStreamResult(response, this.rocketNamespace,
liteTopic);
} catch (Exception e) {
log.error("RocketMQTransport litePushConsumer consumer
error, msgId: {}, error: {}", messageView.getMessageId(), e.getMessage());
return ConsumeResult.SUCCESS;
@@ -591,20 +582,20 @@ public class RocketMQTransport implements ClientTransport
{
return litePushConsumer;
}
- private ConsumeResult dealStreamResult(RocketMQResponse response, String
rocketMQInstanceID, String liteTopic) {
- if (null == response || StringUtils.isEmpty(response.getMessageId())
|| StringUtils.isEmpty(rocketMQInstanceID) || StringUtils.isEmpty(liteTopic) ||
!response.isEnd() && StringUtils.isEmpty(response.getResponseBody())) {
- log.error("RocketMQTransport dealStreamResult param is error,
response: {}, rocketMQInstanceID: {}, liteTopic: {}",
JSON.toJSONString(response), rocketMQInstanceID, liteTopic);
+ private ConsumeResult dealStreamResult(RocketMQResponse response, String
rocketMQNamespace, String liteTopic) {
+ if (null == response || StringUtils.isEmpty(response.getMessageId())
|| StringUtils.isEmpty(liteTopic) || !response.isEnd() &&
StringUtils.isEmpty(response.getResponseBody())) {
+ log.error("RocketMQTransport dealStreamResult param is error,
response: {}, liteTopic: {}", JSON.toJSONString(response), liteTopic);
return ConsumeResult.SUCCESS;
}
- Map<String, SSEEventListener> sseEventListenerMap =
MESSAGE_STREAM_RESPONSE_MAP.get(rocketMQInstanceID);
+ Map<String, SSEEventListener> sseEventListenerMap =
MESSAGE_STREAM_RESPONSE_MAP.get(rocketMQNamespace);
if (null == sseEventListenerMap) {
log.error("RocketMQTransport dealStreamResult sseEventListenerMap
is null");
return ConsumeResult.SUCCESS;
}
SSEEventListener sseEventListener =
sseEventListenerMap.get(response.getMessageId());
if (null == sseEventListener) {
- Map<String, Boolean> booleanMap =
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.get(rocketMQInstanceID);
+ Map<String, Boolean> booleanMap =
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.get(rocketMQNamespace);
if (null == booleanMap) {
log.error("RocketMQTransport dealStreamResult booleanMap is
null");
return ConsumeResult.SUCCESS;
@@ -613,8 +604,8 @@ public class RocketMQTransport implements ClientTransport {
if (null == useDefaultRecoverModeConsumer ||
!useDefaultRecoverModeConsumer) {
return ConsumeResult.SUCCESS;
}
- if (!RECOVER_MESSAGE_STREAM_RESPONSE_MAP.isEmpty() &&
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.containsKey(rocketMQInstanceID)) {
- Map<String, SSEEventListener> sseEventListenerMapRecover =
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.get(rocketMQInstanceID);
+ if (!RECOVER_MESSAGE_STREAM_RESPONSE_MAP.isEmpty() &&
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.containsKey(rocketMQNamespace)) {
+ Map<String, SSEEventListener> sseEventListenerMapRecover =
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.get(rocketMQNamespace);
if (null == sseEventListenerMapRecover) {
log.error("RocketMQTransport dealStreamResult
sseEventListenerMapRecover is null");
return ConsumeResult.SUCCESS;
@@ -647,12 +638,12 @@ public class RocketMQTransport implements ClientTransport
{
return ConsumeResult.SUCCESS;
}
- private ConsumeResult dealNonStreamResult(RocketMQResponse response,
String rocketMQInstanceID) {
- if (null == response || StringUtils.isEmpty(response.getMessageId())
|| StringUtils.isEmpty(response.getResponseBody()) ||
StringUtils.isEmpty(rocketMQInstanceID)) {
- log.error("RocketMQTransport dealNonStreamResult param is error,
response: {}, rocketMQInstanceID: {}", JSON.toJSONString(response),
rocketMQInstanceID);
+ private ConsumeResult dealNonStreamResult(RocketMQResponse response,
String rocketMQNamespace) {
+ if (null == response || StringUtils.isEmpty(response.getMessageId())
|| StringUtils.isEmpty(response.getResponseBody())) {
+ log.error("RocketMQTransport dealNonStreamResult param is error,
response: {}", JSON.toJSONString(response));
return ConsumeResult.SUCCESS;
}
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.get(rocketMQInstanceID);
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.get(rocketMQNamespace);
if (null != completableFutureMap &&
completableFutureMap.containsKey(response.getMessageId())) {
CompletableFuture<String> completableFuture =
completableFutureMap.get(response.getMessageId());
completableFuture.complete(response.getResponseBody());
@@ -725,7 +716,7 @@ public class RocketMQTransport implements ClientTransport {
SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
.setEndpoints(this.rocketMQEndpoint)
- .setNamespace(this.rocketMQInstanceID)
+ .setNamespace(this.rocketNamespace)
.setCredentialProvider(sessionCredentialsProvider)
.setRequestTimeout(Duration.ofSeconds(15))
.build();
@@ -735,8 +726,7 @@ public class RocketMQTransport implements ClientTransport {
return builder.build();
}
- private PayloadAndHeaders applyInterceptors(String methodName, Object
payload,
- AgentCard agentCard, ClientCallContext clientCallContext) {
+ private PayloadAndHeaders applyInterceptors(String methodName, Object
payload, AgentCard agentCard, ClientCallContext clientCallContext) {
PayloadAndHeaders payloadAndHeaders = new PayloadAndHeaders(payload,
getHttpHeaders(clientCallContext));
if (interceptors != null && !interceptors.isEmpty()) {
for (ClientCallInterceptor interceptor : interceptors) {
@@ -750,19 +740,20 @@ public class RocketMQTransport implements ClientTransport
{
return context != null ? context.getHeaders() : Collections.emptyMap();
}
- private RocketMQInstanceInfo parseAgentCardAddition(AgentCard agentCard) {
+ private RocketMQResourceInfo parseAgentCardAddition(AgentCard agentCard) {
if (null == agentCard ||
StringUtils.isEmpty(agentCard.preferredTransport()) ||
StringUtils.isEmpty(agentCard.url()) || null ==
agentCard.additionalInterfaces() || agentCard.additionalInterfaces().isEmpty())
{
log.info("parseAgentCardAddition param error, agentCard: {}",
JSON.toJSONString(agentCard));
return null;
}
- RocketMQInstanceInfo rocketMQInstanceInfo = null;
+ RocketMQResourceInfo rocketMQResourceInfo = null;
String preferredTransport = agentCard.preferredTransport();
if (RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(preferredTransport)) {
String url = agentCard.url();
- rocketMQInstanceInfo = pareAgentCardUrl(url);
- if (null != rocketMQInstanceInfo &&
!StringUtils.isEmpty(rocketMQInstanceInfo.getEndpoint()) &&
!StringUtils.isEmpty(rocketMQInstanceInfo.getInstance()) &&
!StringUtils.isEmpty(rocketMQInstanceInfo.getTopic())) {
- log.info("RocketMQTransport get rocketMQInstanceInfo from
preferredTransport");
- return rocketMQInstanceInfo;
+ rocketMQResourceInfo = pareAgentCardUrl(url);
+ if (null != rocketMQResourceInfo &&
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) &&
!StringUtils.isEmpty(
+ rocketMQResourceInfo.getTopic())) {
+ log.info("RocketMQTransport get rocketMQResourceInfo from
preferredTransport");
+ return rocketMQResourceInfo;
}
}
List<AgentInterface> agentInterfaces =
agentCard.additionalInterfaces();
@@ -770,17 +761,18 @@ public class RocketMQTransport implements ClientTransport
{
String transport = agentInterface.transport();
if (!StringUtils.isEmpty(transport) &&
RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(transport)) {
String url = agentInterface.url();
- rocketMQInstanceInfo = pareAgentCardUrl(url);
- if (null != rocketMQInstanceInfo &&
!StringUtils.isEmpty(rocketMQInstanceInfo.getEndpoint()) &&
!StringUtils.isEmpty(rocketMQInstanceInfo.getInstance()) &&
!StringUtils.isEmpty(rocketMQInstanceInfo.getTopic())) {
- log.info("RocketMQTransport get rocketMQInstanceInfo from
additionalInterfaces");
- return rocketMQInstanceInfo;
+ rocketMQResourceInfo = pareAgentCardUrl(url);
+ if (null != rocketMQResourceInfo &&
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) &&
!StringUtils.isEmpty(
+ rocketMQResourceInfo.getTopic())) {
+ log.info("RocketMQTransport get rocketMQResourceInfo from
additionalInterfaces");
+ return rocketMQResourceInfo;
}
}
}
return null;
}
- private static RocketMQInstanceInfo pareAgentCardUrl(String agentCardUrl) {
+ private static RocketMQResourceInfo pareAgentCardUrl(String agentCardUrl) {
if (StringUtils.isEmpty(agentCardUrl)) {
return null;
}
@@ -790,24 +782,24 @@ public class RocketMQTransport implements ClientTransport
{
if (split.length != 3) {
return null;
}
- RocketMQInstanceInfo rocketMQInstanceInfo = new RocketMQInstanceInfo();
- rocketMQInstanceInfo.setEndpoint(split[0].trim());
- rocketMQInstanceInfo.setInstance(split[1].trim());
- rocketMQInstanceInfo.setTopic(split[2].trim());
- return rocketMQInstanceInfo;
+ RocketMQResourceInfo rocketMQResourceInfo = new RocketMQResourceInfo();
+ rocketMQResourceInfo.setEndpoint(split[0].trim());
+ rocketMQResourceInfo.setNamespace(split[1].trim());
+ rocketMQResourceInfo.setTopic(split[2].trim());
+ return rocketMQResourceInfo;
}
- private static class RocketMQInstanceInfo {
+ private static class RocketMQResourceInfo {
private String endpoint;
private String topic;
- private String instance;
+ private String namespace;
- public RocketMQInstanceInfo(String endpoint, String topic) {
+ public RocketMQResourceInfo(String endpoint, String topic) {
this.endpoint = endpoint;
this.topic = topic;
}
- public RocketMQInstanceInfo() {}
+ public RocketMQResourceInfo() {}
public String getEndpoint() {
return endpoint;
@@ -825,12 +817,12 @@ public class RocketMQTransport implements ClientTransport
{
this.topic = topic;
}
- public String getInstance() {
- return instance;
+ public String getNamespace() {
+ return namespace;
}
- public void setInstance(String instance) {
- this.instance = instance;
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
}
}
diff --git
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportConfig.java
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportConfig.java
index 800edee..49757c6 100644
---
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportConfig.java
+++
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportConfig.java
@@ -23,7 +23,7 @@ public class RocketMQTransportConfig extends
ClientTransportConfig<RocketMQTrans
private String accessKey;
private String secretKey;
private String globalEndpoint;
- private String rocketMQInstanceID;
+ private String rocketMQNamespace;
private String workAgentResponseTopic;
private String workAgentResponseGroupID;
private String agentTopic;
@@ -31,24 +31,24 @@ public class RocketMQTransportConfig extends
ClientTransportConfig<RocketMQTrans
private String liteTopic;
private boolean useDefaultRecoverMode = false;
- public RocketMQTransportConfig(String accessKey, String secretKey, String
globalEndpoint, String rocketMQInstanceID,
+ public RocketMQTransportConfig(String accessKey, String secretKey, String
globalEndpoint, String rocketMQNamespace,
String workAgentResponseTopic, String workAgentResponseGroupID, String
agentTopic, A2AHttpClient httpClient) {
this.accessKey = accessKey;
this.secretKey = secretKey;
this.globalEndpoint = globalEndpoint;
- this.rocketMQInstanceID = rocketMQInstanceID;
+ this.rocketMQNamespace = rocketMQNamespace;
this.workAgentResponseTopic = workAgentResponseTopic;
this.workAgentResponseGroupID = workAgentResponseGroupID;
this.agentTopic = agentTopic;
this.httpClient = httpClient;
}
- public RocketMQTransportConfig(String accessKey, String secretKey, String
globalEndpoint, String rocketMQInstanceID,
+ public RocketMQTransportConfig(String accessKey, String secretKey, String
globalEndpoint, String rocketMQNamespace,
String workAgentResponseTopic, String workAgentResponseGroupID, String
agentTopic, A2AHttpClient httpClient, String liteTopic, boolean
useDefaultRecoverMode) {
this.accessKey = accessKey;
this.secretKey = secretKey;
this.globalEndpoint = globalEndpoint;
- this.rocketMQInstanceID = rocketMQInstanceID;
+ this.rocketMQNamespace = rocketMQNamespace;
this.workAgentResponseTopic = workAgentResponseTopic;
this.workAgentResponseGroupID = workAgentResponseGroupID;
this.agentTopic = agentTopic;
@@ -57,11 +57,11 @@ public class RocketMQTransportConfig extends
ClientTransportConfig<RocketMQTrans
this.useDefaultRecoverMode = useDefaultRecoverMode;
}
- public RocketMQTransportConfig(String accessKey, String secretKey, String
globalEndpoint, String rocketMQInstanceID, String agentTopic, A2AHttpClient
httpClient) {
+ public RocketMQTransportConfig(String accessKey, String secretKey, String
globalEndpoint, String rocketMQNamespace, String agentTopic, A2AHttpClient
httpClient) {
this.accessKey = accessKey;
this.secretKey = secretKey;
this.globalEndpoint = globalEndpoint;
- this.rocketMQInstanceID = rocketMQInstanceID;
+ this.rocketMQNamespace = rocketMQNamespace;
this.agentTopic = agentTopic;
this.httpClient = httpClient;
}
@@ -96,12 +96,12 @@ public class RocketMQTransportConfig extends
ClientTransportConfig<RocketMQTrans
this.globalEndpoint = globalEndpoint;
}
- public String getRocketMQInstanceID() {
- return rocketMQInstanceID;
+ public String getRocketMQNamespace() {
+ return rocketMQNamespace;
}
- public void setRocketMQInstanceID(String rocketMQInstanceID) {
- this.rocketMQInstanceID = rocketMQInstanceID;
+ public void setRocketMQNamespace(String rocketMQNamespace) {
+ this.rocketMQNamespace = rocketMQNamespace;
}
public String getWorkAgentResponseTopic() {
diff --git
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportProvider.java
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportProvider.java
index 36428a6..2b98291 100644
---
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportProvider.java
+++
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportProvider.java
@@ -30,7 +30,7 @@ public class RocketMQTransportProvider implements
ClientTransportProvider<Rocket
if (clientTransportConfig == null) {
clientTransportConfig = new RocketMQTransportConfig(new
JdkA2AHttpClient());
}
- return new
RocketMQTransport(clientTransportConfig.getRocketMQInstanceID(),
clientTransportConfig.getAccessKey(), clientTransportConfig.getSecretKey(),
clientTransportConfig.getWorkAgentResponseTopic(),
clientTransportConfig.getWorkAgentResponseGroupID(),
clientTransportConfig.getInterceptors(), clientTransportConfig.getAgentUrl(),
clientTransportConfig.getHttpClient(), clientTransportConfig.getLiteTopic(),
clientTransportConfig.isUseDefaultRecoverMode(), agentCard);
+ return new
RocketMQTransport(clientTransportConfig.getRocketMQNamespace(),
clientTransportConfig.getAccessKey(), clientTransportConfig.getSecretKey(),
clientTransportConfig.getWorkAgentResponseTopic(),
clientTransportConfig.getWorkAgentResponseGroupID(),
clientTransportConfig.getInterceptors(), clientTransportConfig.getAgentUrl(),
clientTransportConfig.getHttpClient(), clientTransportConfig.getLiteTopic(),
clientTransportConfig.isUseDefaultRecoverMode(), agentCard);
}
@Override