dugenkui03 opened a new issue, #4120:
URL: https://github.com/apache/rocketmq/issues/4120
#### **FEATURE REQUEST**
I suggest that ADD convient method in `MQProducer` which return
`CompletableFuture<SendResult>`.
#### ***功能要求(中文描述)**
`DefaultMQProducer` 中的异步发送方法没有返回`Future`,因此开发者想要在后续的业务处理中使用结果将会很困难。我建议新增带有
`CompletableFuture<SendResult>` 返回结果的异步发送方法。
- 接口定义
```java
CompletableFuture<SendResult> sendAsync(final Message msg, final
SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException;
```
- 大致实现
```java
public CompletableFuture<SendResult> sendAsync(Message msg, SendCallback
sendCallback) {
CompletableFuture<SendResult> asyncResult =
CompletableFuture.supplyAsync(() -> {
try {
return this.send(msg);
} catch (MQClientException | MQBrokerException |
InterruptedException | RemotingException e) {
throw new RuntimeException(e);
}
}, getAsyncSenderExecutor());
// 回调中可能对结果进行修改,因此获取回调完成之后的 Future
CompletableFuture<SendResult> resultAfterCallBack =
asyncResult.whenComplete((res, ex) -> {
if (ex instanceof RuntimeException
&& (ex.getCause() instanceof MQClientException
|| ex.getCause() instanceof MQClientException
|| ex.getCause() instanceof MQClientException
|| ex.getCause() instanceof MQClientException)
) {
sendCallback.onException(ex.getCause());
return;
}
if (ex != null) {
sendCallback.onException(ex);
return;
}
sendCallback.onSuccess(res);
});
return resultAfterCallBack;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]