This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 84156084a4 [ISSUE #7321] Refector NettyRemotingAbstract with unify
future implementation (#7322)
84156084a4 is described below
commit 84156084a4c5228e1d2fe21e068fff330bbc40d1
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Sun Oct 8 11:13:25 2023 +0800
[ISSUE #7321] Refector NettyRemotingAbstract with unify future
implementation (#7322)
* Refector NettyRemotingAbstract
* Add invoke with future method
* Deprecate InvokeCallback#operationComplete
* Add operationSuccess and operationException for InvokeCallback
* fix unit test
* fix unit test
* Keep InvokeCallback#operationComplete
* Optimize invokeAsyncImpl operationComplete
* Add unit test for NettyRemotingClient
* fix checkstyle
---
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 147 ++++++----
.../apache/rocketmq/client/impl/MQAdminImpl.java | 71 ++---
.../rocketmq/client/impl/MQClientAPIImpl.java | 239 ++++++++--------
.../client/impl/mqclient/MQClientAPIExt.java | 309 +++++++++------------
.../rocketmq/client/impl/MQClientAPIImplTest.java | 12 +-
.../proxy/remoting/RemotingProtocolServer.java | 22 +-
.../proxy/service/mqclient/MQClientAPIExtTest.java | 97 +++----
.../apache/rocketmq/remoting/InvokeCallback.java | 15 +
.../apache/rocketmq/remoting/RemotingClient.java | 27 +-
.../remoting/netty/NettyRemotingAbstract.java | 123 +++++---
.../remoting/netty/NettyRemotingClient.java | 33 ++-
.../rocketmq/remoting/netty/ResponseFuture.java | 15 +
.../rocketmq/remoting/rpc/RpcClientImpl.java | 29 +-
.../rocketmq/remoting/RemotingServerTest.java | 22 +-
.../rocketmq/remoting/netty/MockChannel.java | 21 +-
.../remoting/netty/MockChannelPromise.java | 191 +++++++++++++
.../remoting/netty/NettyRemotingAbstractTest.java | 54 +++-
.../remoting/netty/NettyRemotingClientTest.java | 185 +++++++++++-
18 files changed, 1029 insertions(+), 583 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 9dfb8127d6..6fde48dd99 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -73,6 +73,7 @@ import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.BrokerSyncInfo;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -107,6 +108,8 @@ import
org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
@@ -124,8 +127,6 @@ import
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerReques
import
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterTopicRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
-import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
-import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
@@ -151,7 +152,6 @@ public class BrokerOuterAPI {
private final RpcClient rpcClient;
private String nameSrvAddr = null;
-
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, new DynamicalExtFieldRPCHook(), new
ClientMetadata());
}
@@ -459,7 +459,7 @@ public class BrokerOuterAPI {
* @param filterServerList
* @param oneway
* @param timeoutMills
- * @param compressed default false
+ * @param compressed default false
* @return
*/
public List<RegisterBrokerResult> registerBrokerAll(
@@ -643,7 +643,6 @@ public class BrokerOuterAPI {
queueDatas.add(queueData);
final byte[] topicRouteBody = topicRouteData.encode();
-
List<String> nameServerAddressList =
this.remotingClient.getNameServerAddressList();
final CountDownLatch countDownLatch = new
CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
@@ -910,25 +909,33 @@ public class BrokerOuterAPI {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
request.setBody(requestBody.encode());
- this.remotingClient.invokeAsync(addr, request, timeoutMillis,
responseFuture -> {
- if (callback == null) {
- return;
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new
InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
}
- try {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- if (response.getCode() == ResponseCode.SUCCESS) {
- LockBatchResponseBody responseBody =
LockBatchResponseBody.decode(response.getBody(),
- LockBatchResponseBody.class);
- Set<MessageQueue> messageQueues =
responseBody.getLockOKMQSet();
- callback.onSuccess(messageQueues);
- } else {
- callback.onException(new
MQBrokerException(response.getCode(), response.getRemark()));
- }
+ @Override
+ public void operationSucceed(RemotingCommand response) {
+ if (callback == null) {
+ return;
}
- } catch (Throwable ignored) {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ LockBatchResponseBody responseBody =
LockBatchResponseBody.decode(response.getBody(),
+ LockBatchResponseBody.class);
+ Set<MessageQueue> messageQueues =
responseBody.getLockOKMQSet();
+ callback.onSuccess(messageQueues);
+ } else {
+ callback.onException(new
MQBrokerException(response.getCode(), response.getRemark()));
+ }
+ }
+ @Override
+ public void operationFail(Throwable throwable) {
+ if (callback == null) {
+ return;
+ }
+ callback.onException(throwable);
}
});
}
@@ -942,22 +949,30 @@ public class BrokerOuterAPI {
request.setBody(requestBody.encode());
- this.remotingClient.invokeAsync(addr, request, timeoutMillis,
responseFuture -> {
- if (callback == null) {
- return;
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new
InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
}
- try {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- if (response.getCode() == ResponseCode.SUCCESS) {
- callback.onSuccess();
- } else {
- callback.onException(new
MQBrokerException(response.getCode(), response.getRemark()));
- }
+ @Override
+ public void operationSucceed(RemotingCommand response) {
+ if (callback == null) {
+ return;
}
- } catch (Throwable ignored) {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ callback.onSuccess();
+ } else {
+ callback.onException(new
MQBrokerException(response.getCode(), response.getRemark()));
+ }
+ }
+ @Override
+ public void operationFail(Throwable throwable) {
+ if (callback == null) {
+ return;
+ }
+ callback.onException(throwable);
}
});
}
@@ -983,21 +998,27 @@ public class BrokerOuterAPI {
CompletableFuture<SendResult> cf = new CompletableFuture<>();
final String msgId = msg.getMsgId();
try {
- this.remotingClient.invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (null != response) {
- SendResult sendResult = null;
+ this.remotingClient.invokeAsync(brokerAddr, request,
timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
try {
- sendResult = this.processSendResponse(brokerName, msg,
response);
+ SendResult sendResult =
processSendResponse(brokerName, msg, response);
cf.complete(sendResult);
} catch (MQBrokerException | RemotingCommandException e) {
LOGGER.error("processSendResponse in
sendMessageToSpecificBrokerAsync failed, msgId=" + msgId, e);
cf.completeExceptionally(e);
}
- } else {
- cf.complete(null);
}
+ @Override
+ public void operationFail(Throwable throwable) {
+ cf.completeExceptionally(throwable);
+ }
});
} catch (Throwable t) {
LOGGER.error("invokeAsync failed in
sendMessageToSpecificBrokerAsync, msgId=" + msgId, t);
@@ -1057,7 +1078,7 @@ public class BrokerOuterAPI {
}
if (sendStatus != null) {
SendMessageResponseHeader responseHeader =
- (SendMessageResponseHeader)
response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+ (SendMessageResponseHeader)
response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
@@ -1073,8 +1094,8 @@ public class BrokerOuterAPI {
uniqMsgId = sb.toString();
}
SendResult sendResult = new SendResult(sendStatus,
- uniqMsgId,
- responseHeader.getMsgId(), messageQueue,
responseHeader.getQueueOffset());
+ uniqMsgId,
+ responseHeader.getMsgId(), messageQueue,
responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId =
response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn =
response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
@@ -1218,8 +1239,9 @@ public class BrokerOuterAPI {
/**
* Broker try to elect itself as a master in broker set
*/
- public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String
controllerAddress, String clusterName, String brokerName,
- Long brokerId) throws
Exception {
+ public Pair<ElectMasterResponseHeader, Set<Long>> brokerElect(String
controllerAddress, String clusterName,
+ String brokerName,
+ Long brokerId) throws Exception {
final ElectMasterRequestHeader requestHeader =
ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerId);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER,
requestHeader);
@@ -1237,7 +1259,8 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public GetNextBrokerIdResponseHeader getNextBrokerId(final String
clusterName, final String brokerName, final String controllerAddress) throws
Exception {
+ public GetNextBrokerIdResponseHeader getNextBrokerId(final String
clusterName, final String brokerName,
+ final String controllerAddress) throws Exception {
final GetNextBrokerIdRequestHeader requestHeader = new
GetNextBrokerIdRequestHeader(clusterName, brokerName);
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_NEXT_BROKER_ID,
requestHeader);
final RemotingCommand response =
this.remotingClient.invokeSync(controllerAddress, request, 3000);
@@ -1248,7 +1271,8 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName,
final String brokerName, final Long brokerId, final String registerCheckCode,
final String controllerAddress) throws Exception {
+ public ApplyBrokerIdResponseHeader applyBrokerId(final String clusterName,
final String brokerName,
+ final Long brokerId, final String registerCheckCode, final String
controllerAddress) throws Exception {
final ApplyBrokerIdRequestHeader requestHeader = new
ApplyBrokerIdRequestHeader(clusterName, brokerName, brokerId,
registerCheckCode);
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_APPLY_BROKER_ID,
requestHeader);
final RemotingCommand response =
this.remotingClient.invokeSync(controllerAddress, request, 3000);
@@ -1259,7 +1283,9 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>>
registerBrokerToController(final String clusterName, final String brokerName,
final Long brokerId, final String brokerAddress, final String
controllerAddress) throws Exception {
+ public Pair<RegisterBrokerToControllerResponseHeader, Set<Long>>
registerBrokerToController(
+ final String clusterName, final String brokerName, final Long
brokerId, final String brokerAddress,
+ final String controllerAddress) throws Exception {
final RegisterBrokerToControllerRequestHeader requestHeader = new
RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerId,
brokerAddress);
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER,
requestHeader);
final RemotingCommand response =
this.remotingClient.invokeSync(controllerAddress, request, 3000);
@@ -1355,16 +1381,25 @@ public class BrokerOuterAPI {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
CompletableFuture<PullResult> pullResultFuture = new
CompletableFuture<>();
- this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis,
responseFuture -> {
- if (responseFuture.getCause() != null) {
- pullResultFuture.complete(new
PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
- return;
+ this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis,
new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
}
- try {
- PullResultExt pullResultExt =
this.processPullResponse(responseFuture.getResponseCommand(), brokerAddr);
- this.processPullResult(pullResultExt, brokerName, queueId);
- pullResultFuture.complete(pullResultExt);
- } catch (Exception e) {
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
+ try {
+ PullResultExt pullResultExt =
processPullResponse(response, brokerAddr);
+ processPullResult(pullResultExt, brokerName, queueId);
+ pullResultFuture.complete(pullResultExt);
+ } catch (Exception e) {
+ pullResultFuture.complete(new
PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
+ }
+ }
+
+ @Override
+ public void operationFail(Throwable throwable) {
pullResultFuture.complete(new
PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
}
});
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 1ef3a94835..83835bd3d3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -44,6 +44,8 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -55,8 +57,6 @@ import
org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class MQAdminImpl {
@@ -357,44 +357,51 @@ public class MQAdminImpl {
new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture
responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand
response) {
try {
- RemotingCommand response =
responseFuture.getResponseCommand();
- if (response != null) {
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- QueryMessageResponseHeader
responseHeader = null;
- try {
- responseHeader =
-
(QueryMessageResponseHeader) response
-
.decodeCommandCustomHeader(QueryMessageResponseHeader.class);
- } catch
(RemotingCommandException e) {
-
log.error("decodeCommandCustomHeader exception", e);
- return;
- }
-
- List<MessageExt> wrappers =
-
MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
-
- QueryResult qr = new
QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
- try {
-
lock.writeLock().lock();
-
queryResultList.add(qr);
- } finally {
-
lock.writeLock().unlock();
- }
- break;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ QueryMessageResponseHeader
responseHeader = null;
+ try {
+ responseHeader =
+
(QueryMessageResponseHeader) response
+
.decodeCommandCustomHeader(QueryMessageResponseHeader.class);
+ } catch
(RemotingCommandException e) {
+
log.error("decodeCommandCustomHeader exception", e);
+ return;
}
- default:
-
log.warn("getResponseCommand failed, {} {}", response.getCode(),
response.getRemark());
- break;
+
+ List<MessageExt> wrappers =
+
MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
+
+ QueryResult qr = new
QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
+ try {
+ lock.writeLock().lock();
+ queryResultList.add(qr);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ break;
}
- } else {
- log.warn("getResponseCommand
return null");
+ default:
+ log.warn("getResponseCommand
failed, {} {}", response.getCode(), response.getRemark());
+ break;
}
+
} finally {
countDownLatch.countDown();
}
}
+
+ @Override
+ public void operationFail(Throwable throwable)
{
+ log.error("queryMessage error,
requestHeader={}", requestHeader);
+ countDownLatch.countDown();
+ }
}, isUniqKey);
} catch (Exception e) {
log.warn("queryMessage exception", e);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 3201a493f7..2407e57373 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.Validators;
-import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.consumer.AckCallback;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
@@ -653,10 +652,13 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new
InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
- long cost = System.currentTimeMillis() - beginStartTime;
- RemotingCommand response =
responseFuture.getResponseCommand();
- if (null == sendCallback && response != null) {
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
+ long cost = System.currentTimeMillis() - beginStartTime;
+ if (null == sendCallback) {
try {
SendResult sendResult =
MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
if (context != null && sendResult != null) {
@@ -666,46 +668,47 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
} catch (Throwable e) {
}
- producer.updateFaultItem(brokerName,
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true);
+ producer.updateFaultItem(brokerName,
System.currentTimeMillis() - beginStartTime, false, true);
return;
}
- if (response != null) {
+ try {
+ SendResult sendResult =
MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
+ assert sendResult != null;
+ if (context != null) {
+ context.setSendResult(sendResult);
+
context.getProducer().executeSendMessageHookAfter(context);
+ }
+
try {
- SendResult sendResult =
MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
- assert sendResult != null;
- if (context != null) {
- context.setSendResult(sendResult);
-
context.getProducer().executeSendMessageHookAfter(context);
- }
+ sendCallback.onSuccess(sendResult);
+ } catch (Throwable e) {
+ }
- try {
- sendCallback.onSuccess(sendResult);
- } catch (Throwable e) {
- }
+ producer.updateFaultItem(brokerName,
System.currentTimeMillis() - beginStartTime, false, true);
+ } catch (Exception e) {
+ producer.updateFaultItem(brokerName,
System.currentTimeMillis() - beginStartTime, true, true);
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost,
request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, e, context,
false, producer);
+ }
+ }
- producer.updateFaultItem(brokerName,
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false, true);
- } catch (Exception e) {
- producer.updateFaultItem(brokerName,
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true);
- onExceptionImpl(brokerName, msg, timeoutMillis -
cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, e, context,
false, producer);
- }
+ @Override
+ public void operationFail(Throwable throwable) {
+ producer.updateFaultItem(brokerName,
System.currentTimeMillis() - beginStartTime, true, true);
+ long cost = System.currentTimeMillis() - beginStartTime;
+ if (throwable instanceof RemotingSendRequestException) {
+ MQClientException ex = new MQClientException("send
request failed", throwable);
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost,
request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context,
true, producer);
+ } else if (throwable instanceof RemotingTimeoutException) {
+ MQClientException ex = new MQClientException("wait
response timeout, cost=" + cost, throwable);
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost,
request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context,
true, producer);
} else {
- producer.updateFaultItem(brokerName,
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true, true);
- if (!responseFuture.isSendRequestOK()) {
- MQClientException ex = new MQClientException("send
request failed", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis -
cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context,
true, producer);
- } else if (responseFuture.isTimeout()) {
- MQClientException ex = new MQClientException("wait
response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis -
cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context,
true, producer);
- } else {
- MQClientException ex = new
MQClientException("unknow reseaon", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, timeoutMillis -
cost, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context,
true, producer);
- }
+ MQClientException ex = new MQClientException("unknow
reseaon", throwable);
+ onExceptionImpl(brokerName, msg, timeoutMillis - cost,
request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context,
true, producer);
}
}
});
@@ -857,30 +860,25 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
final long timeoutMillis, final PopCallback popCallback
) throws RemotingException, InterruptedException {
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader);
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new
BaseInvokeCallback(MQClientAPIImpl.this) {
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new
InvokeCallback() {
@Override
- public void onComplete(ResponseFuture responseFuture) {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- try {
- PopResult
- popResult =
MQClientAPIImpl.this.processPopResponse(brokerName, response,
requestHeader.getTopic(), requestHeader);
- assert popResult != null;
- popCallback.onSuccess(popResult);
- } catch (Exception e) {
- popCallback.onException(e);
- }
- } else {
- if (!responseFuture.isSendRequestOK()) {
- popCallback.onException(new
MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request
failed to " + addr + ". Request: " + request, responseFuture.getCause()));
- } else if (responseFuture.isTimeout()) {
- popCallback.onException(new
MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from "
+ addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request:
" + request,
- responseFuture.getCause()));
- } else {
- popCallback.onException(new MQClientException("unknown
reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " +
request, responseFuture.getCause()));
- }
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
+ try {
+ PopResult popResult =
MQClientAPIImpl.this.processPopResponse(brokerName, response,
requestHeader.getTopic(), requestHeader);
+ popCallback.onSuccess(popResult);
+ } catch (Exception e) {
+ popCallback.onException(e);
}
}
+ @Override
+ public void operationFail(Throwable throwable) {
+ popCallback.onException(throwable);
+ }
});
}
@@ -959,34 +957,26 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
request.setBody(requestBody.encode());
}
}
- this.remotingClient.invokeAsync(addr, request, timeOut, new
BaseInvokeCallback(MQClientAPIImpl.this) {
+ this.remotingClient.invokeAsync(addr, request, timeOut, new
InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
@Override
- public void onComplete(ResponseFuture responseFuture) {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- try {
- AckResult ackResult = new AckResult();
- if (ResponseCode.SUCCESS == response.getCode()) {
- ackResult.setStatus(AckStatus.OK);
- } else {
- ackResult.setStatus(AckStatus.NO_EXIST);
- }
- ackCallback.onSuccess(ackResult);
- } catch (Exception e) {
- ackCallback.onException(e);
- }
+ public void operationSucceed(RemotingCommand response) {
+ AckResult ackResult = new AckResult();
+ if (ResponseCode.SUCCESS == response.getCode()) {
+ ackResult.setStatus(AckStatus.OK);
} else {
- if (!responseFuture.isSendRequestOK()) {
- ackCallback.onException(new
MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request
failed to " + addr + ". Request: " + request, responseFuture.getCause()));
- } else if (responseFuture.isTimeout()) {
- ackCallback.onException(new
MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from "
+ addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request:
" + request,
- responseFuture.getCause()));
- } else {
- ackCallback.onException(new MQClientException("unknown
reason. addr: " + addr + ", timeoutMillis: " + timeOut + ". Request: " +
request, responseFuture.getCause()));
- }
+ ackResult.setStatus(AckStatus.NO_EXIST);
}
+ ackCallback.onSuccess(ackResult);
+ }
+ @Override
+ public void operationFail(Throwable throwable) {
+ ackCallback.onException(throwable);
}
});
}
@@ -999,39 +989,37 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
final AckCallback ackCallback
) throws RemotingException, MQBrokerException, InterruptedException {
final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
requestHeader);
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new
BaseInvokeCallback(MQClientAPIImpl.this) {
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new
InvokeCallback() {
@Override
- public void onComplete(ResponseFuture responseFuture) {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- try {
- ChangeInvisibleTimeResponseHeader responseHeader =
(ChangeInvisibleTimeResponseHeader)
response.decodeCommandCustomHeader(ChangeInvisibleTimeResponseHeader.class);
- AckResult ackResult = new AckResult();
- if (ResponseCode.SUCCESS == response.getCode()) {
- ackResult.setStatus(AckStatus.OK);
- ackResult.setPopTime(responseHeader.getPopTime());
- ackResult.setExtraInfo(ExtraInfoUtil
- .buildExtraInfo(requestHeader.getOffset(),
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
- responseHeader.getReviveQid(),
requestHeader.getTopic(), brokerName, requestHeader.getQueueId()) +
MessageConst.KEY_SEPARATOR
- + requestHeader.getOffset());
- } else {
- ackResult.setStatus(AckStatus.NO_EXIST);
- }
- ackCallback.onSuccess(ackResult);
- } catch (Exception e) {
- ackCallback.onException(e);
- }
- } else {
- if (!responseFuture.isSendRequestOK()) {
- ackCallback.onException(new
MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request
failed to " + addr + ". Request: " + request, responseFuture.getCause()));
- } else if (responseFuture.isTimeout()) {
- ackCallback.onException(new
MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from "
+ addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request:
" + request,
- responseFuture.getCause()));
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
+ try {
+ ChangeInvisibleTimeResponseHeader responseHeader =
(ChangeInvisibleTimeResponseHeader)
response.decodeCommandCustomHeader(ChangeInvisibleTimeResponseHeader.class);
+ AckResult ackResult = new AckResult();
+ if (ResponseCode.SUCCESS == response.getCode()) {
+ ackResult.setStatus(AckStatus.OK);
+ ackResult.setPopTime(responseHeader.getPopTime());
+ ackResult.setExtraInfo(ExtraInfoUtil
+ .buildExtraInfo(requestHeader.getOffset(),
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
+ responseHeader.getReviveQid(),
requestHeader.getTopic(), brokerName, requestHeader.getQueueId()) +
MessageConst.KEY_SEPARATOR
+ + requestHeader.getOffset());
} else {
- ackCallback.onException(new MQClientException("unknown
reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " +
request, responseFuture.getCause()));
+ ackResult.setStatus(AckStatus.NO_EXIST);
}
+ ackCallback.onSuccess(ackResult);
+ } catch (Exception e) {
+ ackCallback.onException(e);
}
}
+
+ @Override
+ public void operationFail(Throwable throwable) {
+ ackCallback.onException(throwable);
+ }
});
}
@@ -1044,26 +1032,23 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new
InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- try {
- PullResult pullResult =
MQClientAPIImpl.this.processPullResponse(response, addr);
- assert pullResult != null;
- pullCallback.onSuccess(pullResult);
- } catch (Exception e) {
- pullCallback.onException(e);
- }
- } else {
- if (!responseFuture.isSendRequestOK()) {
- pullCallback.onException(new
MQClientException(ClientErrorCode.CONNECT_BROKER_EXCEPTION, "send request
failed to " + addr + ". Request: " + request, responseFuture.getCause()));
- } else if (responseFuture.isTimeout()) {
- pullCallback.onException(new
MQClientException(ClientErrorCode.ACCESS_BROKER_TIMEOUT, "wait response from "
+ addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request:
" + request,
- responseFuture.getCause()));
- } else {
- pullCallback.onException(new
MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " +
timeoutMillis + ". Request: " + request, responseFuture.getCause()));
- }
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
+ try {
+ PullResult pullResult =
MQClientAPIImpl.this.processPullResponse(response, addr);
+ pullCallback.onSuccess(pullResult);
+ } catch (Exception e) {
+ pullCallback.onException(e);
}
}
+
+ @Override
+ public void operationFail(Throwable throwable) {
+ pullCallback.onException(throwable);
+ }
});
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index d7c8ef8d92..f3102e1759 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -30,7 +30,6 @@ import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.OffsetNotFoundException;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
import org.apache.rocketmq.client.impl.CommunicationMode;
@@ -47,6 +46,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
@@ -106,19 +106,6 @@ public class MQClientAPIExt extends MQClientAPIImpl {
return false;
}
- protected static MQClientException processNullResponseErr(ResponseFuture
responseFuture) {
- MQClientException ex;
- if (!responseFuture.isSendRequestOK()) {
- ex = new MQClientException("send request failed",
responseFuture.getCause());
- } else if (responseFuture.isTimeout()) {
- ex = new MQClientException("wait response timeout " +
responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause());
- } else {
- ex = new MQClientException("unknown reason",
responseFuture.getCause());
- }
- return ex;
- }
-
public CompletableFuture<Void> sendHeartbeatOneway(
String brokerAddr,
HeartbeatData heartbeatData,
@@ -146,24 +133,15 @@ public class MQClientAPIExt extends MQClientAPIImpl {
request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode());
- CompletableFuture<Integer> future = new CompletableFuture<>();
- try {
- this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- if (ResponseCode.SUCCESS == response.getCode()) {
- future.complete(response.getVersion());
- } else {
- future.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark(), brokerAddr));
- }
- } else {
-
future.completeExceptionally(processNullResponseErr(responseFuture));
- }
- });
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
- return future;
+ return this.getRemotingClient().invoke(brokerAddr, request,
timeoutMillis).thenCompose(response -> {
+ CompletableFuture<Integer> future0 = new CompletableFuture<>();
+ if (ResponseCode.SUCCESS == response.getCode()) {
+ future0.complete(response.getVersion());
+ } else {
+ future0.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark(), brokerAddr));
+ }
+ return future0;
+ });
}
public CompletableFuture<SendResult> sendMessageAsync(
@@ -177,24 +155,15 @@ public class MQClientAPIExt extends MQClientAPIImpl {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2,
requestHeaderV2);
request.setBody(msg.getBody());
- CompletableFuture<SendResult> future = new CompletableFuture<>();
- try {
- this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- try {
- future.complete(this.processSendResponse(brokerName,
msg, response, brokerAddr));
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- } else {
-
future.completeExceptionally(processNullResponseErr(responseFuture));
- }
- });
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
- return future;
+ return this.getRemotingClient().invoke(brokerAddr, request,
timeoutMillis).thenCompose(response -> {
+ CompletableFuture<SendResult> future0 = new CompletableFuture<>();
+ try {
+ future0.complete(this.processSendResponse(brokerName, msg,
response, brokerAddr));
+ } catch (Exception e) {
+ future0.completeExceptionally(e);
+ }
+ return future0;
+ });
}
public CompletableFuture<SendResult> sendMessageAsync(
@@ -216,17 +185,14 @@ public class MQClientAPIExt extends MQClientAPIImpl {
msgBatch.setBody(body);
request.setBody(body);
- this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- try {
- future.complete(this.processSendResponse(brokerName,
msgBatch, response, brokerAddr));
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- } else {
-
future.completeExceptionally(processNullResponseErr(responseFuture));
+ return this.getRemotingClient().invoke(brokerAddr, request,
timeoutMillis).thenCompose(response -> {
+ CompletableFuture<SendResult> future0 = new
CompletableFuture<>();
+ try {
+ future0.complete(processSendResponse(brokerName, msgBatch,
response, brokerAddr));
+ } catch (Exception e) {
+ future0.completeExceptionally(e);
}
+ return future0;
});
} catch (Throwable t) {
future.completeExceptionally(t);
@@ -240,21 +206,7 @@ public class MQClientAPIExt extends MQClientAPIImpl {
long timeoutMillis
) {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK,
requestHeader);
-
- CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
- try {
- this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- future.complete(response);
- } else {
-
future.completeExceptionally(processNullResponseErr(responseFuture));
- }
- });
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
- return future;
+ return this.getRemotingClient().invoke(brokerAddr, request,
timeoutMillis);
}
public CompletableFuture<PopResult> popMessageAsync(
@@ -402,38 +354,31 @@ public class MQClientAPIExt extends MQClientAPIImpl {
QueryConsumerOffsetRequestHeader requestHeader,
long timeoutMillis
) {
- CompletableFuture<Long> future = new CompletableFuture<>();
- try {
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET,
requestHeader);
- this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- try {
- QueryConsumerOffsetResponseHeader
responseHeader =
- (QueryConsumerOffsetResponseHeader)
response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
- future.complete(responseHeader.getOffset());
- } catch (RemotingCommandException e) {
- future.completeExceptionally(e);
- }
- break;
- }
- case ResponseCode.QUERY_NOT_FOUND: {
- future.completeExceptionally(new
OffsetNotFoundException(response.getCode(), response.getRemark(), brokerAddr));
- break;
- }
- default:
- break;
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET,
requestHeader);
+ return this.getRemotingClient().invoke(brokerAddr, request,
timeoutMillis).thenCompose(response -> {
+ CompletableFuture<Long> future0 = new CompletableFuture<>();
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ try {
+ QueryConsumerOffsetResponseHeader responseHeader =
+ (QueryConsumerOffsetResponseHeader)
response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+ future0.complete(responseHeader.getOffset());
+ } catch (RemotingCommandException e) {
+ future0.completeExceptionally(e);
}
- } else {
-
future.completeExceptionally(processNullResponseErr(responseFuture));
+ break;
}
- });
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
- return future;
+ case ResponseCode.QUERY_NOT_FOUND: {
+ future0.completeExceptionally(new
OffsetNotFoundException(response.getCode(), response.getRemark(), brokerAddr));
+ break;
+ }
+ default: {
+ future0.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark()));
+ break;
+ }
+ }
+ return future0;
+ });
}
public CompletableFuture<Void> updateConsumerOffsetOneWay(
@@ -461,9 +406,14 @@ public class MQClientAPIExt extends MQClientAPIImpl {
CompletableFuture<List<String>> future = new CompletableFuture<>();
try {
- this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
+ this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
@@ -485,8 +435,11 @@ public class MQClientAPIExt extends MQClientAPIImpl {
break;
}
future.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark()));
- } else {
-
future.completeExceptionally(processNullResponseErr(responseFuture));
+ }
+
+ @Override
+ public void operationFail(Throwable throwable) {
+ future.completeExceptionally(throwable);
}
});
} catch (Throwable t) {
@@ -501,9 +454,14 @@ public class MQClientAPIExt extends MQClientAPIImpl {
CompletableFuture<Long> future = new CompletableFuture<>();
try {
- this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
+ this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
if (ResponseCode.SUCCESS == response.getCode()) {
try {
GetMaxOffsetResponseHeader responseHeader =
(GetMaxOffsetResponseHeader)
response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
@@ -513,8 +471,11 @@ public class MQClientAPIExt extends MQClientAPIImpl {
}
}
future.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark()));
- } else {
-
future.completeExceptionally(processNullResponseErr(responseFuture));
+ }
+
+ @Override
+ public void operationFail(Throwable throwable) {
+ future.completeExceptionally(throwable);
}
});
} catch (Throwable t) {
@@ -529,9 +490,14 @@ public class MQClientAPIExt extends MQClientAPIImpl {
CompletableFuture<Long> future = new CompletableFuture<>();
try {
- this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
+ this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
if (ResponseCode.SUCCESS == response.getCode()) {
try {
GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader)
response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
@@ -541,8 +507,11 @@ public class MQClientAPIExt extends MQClientAPIImpl {
}
}
future.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark()));
- } else {
-
future.completeExceptionally(processNullResponseErr(responseFuture));
+ }
+
+ @Override
+ public void operationFail(Throwable throwable) {
+ future.completeExceptionally(throwable);
}
});
} catch (Throwable t) {
@@ -555,57 +524,41 @@ public class MQClientAPIExt extends MQClientAPIImpl {
long timeoutMillis) {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP,
requestHeader);
- CompletableFuture<Long> future = new CompletableFuture<>();
- try {
- this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- if (response.getCode() == ResponseCode.SUCCESS) {
- try {
- SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader)
response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
- future.complete(responseHeader.getOffset());
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
- }
- future.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark()));
- } else {
-
future.completeExceptionally(processNullResponseErr(responseFuture));
+ return this.getRemotingClient().invoke(brokerAddr, request,
timeoutMillis).thenCompose(response -> {
+ CompletableFuture<Long> future0 = new CompletableFuture<>();
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ try {
+ SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader)
response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
+ future0.complete(responseHeader.getOffset());
+ } catch (Throwable t) {
+ future0.completeExceptionally(t);
}
- });
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
- return future;
+ } else {
+ future0.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark()));
+ }
+ return future0;
+ });
}
public CompletableFuture<Set<MessageQueue>> lockBatchMQWithFuture(String
brokerAddr,
LockBatchRequestBody requestBody, long timeoutMillis) {
- CompletableFuture<Set<MessageQueue>> future = new
CompletableFuture<>();
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
request.setBody(requestBody.encode());
- try {
- this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- if (response.getCode() == ResponseCode.SUCCESS) {
- try {
- LockBatchResponseBody responseBody =
LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
- Set<MessageQueue> messageQueues =
responseBody.getLockOKMQSet();
- future.complete(messageQueues);
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
- }
- future.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark()));
- } else {
-
future.completeExceptionally(processNullResponseErr(responseFuture));
+ return this.getRemotingClient().invoke(brokerAddr, request,
timeoutMillis).thenCompose(response -> {
+ CompletableFuture<Set<MessageQueue>> future0 = new
CompletableFuture<>();
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ try {
+ LockBatchResponseBody responseBody =
LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
+ Set<MessageQueue> messageQueues =
responseBody.getLockOKMQSet();
+ future0.complete(messageQueues);
+ } catch (Throwable t) {
+ future0.completeExceptionally(t);
}
- });
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
+ } else {
+ future0.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark()));
+ }
+ return future0;
+ });
}
public CompletableFuture<Void> unlockBatchMQOneway(String brokerAddr,
@@ -624,25 +577,21 @@ public class MQClientAPIExt extends MQClientAPIImpl {
public CompletableFuture<Boolean> notification(String brokerAddr,
NotificationRequestHeader requestHeader,
long timeoutMillis) {
- CompletableFuture<Boolean> future = new CompletableFuture<>();
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.NOTIFICATION, requestHeader);
- try {
- this.getRemotingClient().invoke(brokerAddr, request,
timeoutMillis).thenAccept(response -> {
- if (response.getCode() == ResponseCode.SUCCESS) {
- try {
- NotificationResponseHeader responseHeader =
(NotificationResponseHeader)
response.decodeCommandCustomHeader(NotificationResponseHeader.class);
- future.complete(responseHeader.isHasMsg());
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
- } else {
- future.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark()));
+ return this.getRemotingClient().invoke(brokerAddr, request,
timeoutMillis).thenCompose(response -> {
+ CompletableFuture<Boolean> future0 = new CompletableFuture<>();
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ try {
+ NotificationResponseHeader responseHeader =
(NotificationResponseHeader)
response.decodeCommandCustomHeader(NotificationResponseHeader.class);
+ future0.complete(responseHeader.isHasMsg());
+ } catch (Throwable t) {
+ future0.completeExceptionally(t);
}
- });
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
- return future;
+ } else {
+ future0.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark()));
+ }
+ return future0;
+ });
}
public CompletableFuture<RemotingCommand> invoke(String brokerAddr,
RemotingCommand request, long timeoutMillis) {
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index d13f2cfe43..c152d38ea5 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -212,7 +212,7 @@ public class MQClientAPIImplTest {
RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(null,
request.getOpaque(), 3 * 1000, null, null);
responseFuture.setResponseCommand(createSendMessageSuccessResponse(request));
- callback.operationComplete(responseFuture);
+ callback.operationSucceed(responseFuture.getResponseCommand());
return null;
}
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
@@ -386,7 +386,7 @@ public class MQClientAPIImplTest {
RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(null,
request.getOpaque(), 3 * 1000, null, null);
responseFuture.setResponseCommand(createSendMessageSuccessResponse(request));
- callback.operationComplete(responseFuture);
+ callback.operationSucceed(responseFuture.getResponseCommand());
return null;
}
}).when(remotingClient).invokeAsync(Matchers.anyString(),
Matchers.any(RemotingCommand.class), Matchers.anyLong(),
Matchers.any(InvokeCallback.class));
@@ -472,7 +472,7 @@ public class MQClientAPIImplTest {
message.putUserProperty("key", "value");
response.setBody(MessageDecoder.encode(message, false));
responseFuture.setResponseCommand(response);
- callback.operationComplete(responseFuture);
+ callback.operationSucceed(responseFuture.getResponseCommand());
return null;
}
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
@@ -543,7 +543,7 @@ public class MQClientAPIImplTest {
message.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
String.valueOf(0));
response.setBody(MessageDecoder.encode(message, false));
responseFuture.setResponseCommand(response);
- callback.operationComplete(responseFuture);
+ callback.operationSucceed(responseFuture.getResponseCommand());
return null;
}
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
@@ -585,7 +585,7 @@ public class MQClientAPIImplTest {
response.setOpaque(request.getOpaque());
response.setCode(ResponseCode.SUCCESS);
responseFuture.setResponseCommand(response);
- callback.operationComplete(responseFuture);
+ callback.operationSucceed(responseFuture.getResponseCommand());
return null;
}
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
@@ -622,7 +622,7 @@ public class MQClientAPIImplTest {
responseHeader.setPopTime(System.currentTimeMillis());
responseHeader.setInvisibleTime(10 * 1000L);
responseFuture.setResponseCommand(response);
- callback.operationComplete(responseFuture);
+ callback.operationSucceed(responseFuture.getResponseCommand());
return null;
}
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index fe07090d50..3227d1e1c6 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
-import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
@@ -51,10 +50,12 @@ import
org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
import org.apache.rocketmq.proxy.remoting.pipeline.AuthenticationPipeline;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
@@ -239,12 +240,21 @@ public class RemotingProtocolServer implements
StartAndShutdown, RemotingProxyOu
long timeoutMillis) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
- this.defaultRemotingServer.invokeAsync(channel, request,
timeoutMillis, responseFuture -> {
- if (responseFuture.getResponseCommand() == null) {
- future.completeExceptionally(new
MQClientException("response is null after send request to client",
responseFuture.getCause()));
- return;
+ this.defaultRemotingServer.invokeAsync(channel, request,
timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
+ future.complete(response);
+ }
+
+ @Override
+ public void operationFail(Throwable throwable) {
+ future.completeExceptionally(throwable);
}
- future.complete(responseFuture.getResponseCommand());
});
} catch (Throwable t) {
future.completeExceptionally(t);
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
index 3f3a4ae40c..e2d05b0f5a 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -85,6 +86,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
@RunWith(MockitoJUnitRunner.class)
public class MQClientAPIExtTest {
@@ -109,13 +111,9 @@ public class MQClientAPIExtTest {
@Test
public void testSendHeartbeatAsync() throws Exception {
- doAnswer((Answer<Void>) mock -> {
- InvokeCallback invokeCallback = mock.getArgument(3);
- ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000,
invokeCallback, null);
-
responseFuture.putResponse(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
""));
- invokeCallback.operationComplete(responseFuture);
- return null;
- }).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any());
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+
future.complete(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
""));
+ doReturn(future).when(remotingClient).invoke(anyString(),
any(RemotingCommand.class), anyLong());
assertNotNull(mqClientAPI.sendHeartbeatAsync(BROKER_ADDR, new
HeartbeatData(), TIMEOUT).get());
}
@@ -123,20 +121,16 @@ public class MQClientAPIExtTest {
@Test
public void testSendMessageAsync() throws Exception {
AtomicReference<String> msgIdRef = new AtomicReference<>();
- doAnswer((Answer<Void>) mock -> {
- InvokeCallback invokeCallback = mock.getArgument(3);
- ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000,
invokeCallback, null);
- RemotingCommand response =
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
- SendMessageResponseHeader sendMessageResponseHeader =
(SendMessageResponseHeader) response.readCustomHeader();
- sendMessageResponseHeader.setMsgId(msgIdRef.get());
- sendMessageResponseHeader.setQueueId(0);
- sendMessageResponseHeader.setQueueOffset(1L);
- response.setCode(ResponseCode.SUCCESS);
- response.makeCustomHeaderToNet();
- responseFuture.putResponse(response);
- invokeCallback.operationComplete(responseFuture);
- return null;
- }).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any());
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ RemotingCommand response =
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+ SendMessageResponseHeader sendMessageResponseHeader =
(SendMessageResponseHeader) response.readCustomHeader();
+ sendMessageResponseHeader.setMsgId(msgIdRef.get());
+ sendMessageResponseHeader.setQueueId(0);
+ sendMessageResponseHeader.setQueueOffset(1L);
+ response.setCode(ResponseCode.SUCCESS);
+ response.makeCustomHeaderToNet();
+ future.complete(response);
+ doReturn(future).when(remotingClient).invoke(anyString(),
any(RemotingCommand.class), anyLong());
MessageExt messageExt = createMessage();
msgIdRef.set(MessageClientIDSetter.getUniqID(messageExt));
@@ -150,20 +144,16 @@ public class MQClientAPIExtTest {
@Test
public void testSendMessageListAsync() throws Exception {
- doAnswer((Answer<Void>) mock -> {
- InvokeCallback invokeCallback = mock.getArgument(3);
- ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000,
invokeCallback, null);
- RemotingCommand response =
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
- SendMessageResponseHeader sendMessageResponseHeader =
(SendMessageResponseHeader) response.readCustomHeader();
- sendMessageResponseHeader.setMsgId("");
- sendMessageResponseHeader.setQueueId(0);
- sendMessageResponseHeader.setQueueOffset(1L);
- response.setCode(ResponseCode.SUCCESS);
- response.makeCustomHeaderToNet();
- responseFuture.putResponse(response);
- invokeCallback.operationComplete(responseFuture);
- return null;
- }).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any());
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ RemotingCommand response =
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+ SendMessageResponseHeader sendMessageResponseHeader =
(SendMessageResponseHeader) response.readCustomHeader();
+ sendMessageResponseHeader.setMsgId("");
+ sendMessageResponseHeader.setQueueId(0);
+ sendMessageResponseHeader.setQueueOffset(1L);
+ response.setCode(ResponseCode.SUCCESS);
+ response.makeCustomHeaderToNet();
+ future.complete(response);
+ doReturn(future).when(remotingClient).invoke(anyString(),
any(RemotingCommand.class), anyLong());
List<MessageExt> messageExtList = new ArrayList<>();
StringBuilder sb = new StringBuilder();
@@ -182,13 +172,9 @@ public class MQClientAPIExtTest {
@Test
public void testSendMessageBackAsync() throws Exception {
- doAnswer((Answer<Void>) mock -> {
- InvokeCallback invokeCallback = mock.getArgument(3);
- ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000,
invokeCallback, null);
-
responseFuture.putResponse(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
""));
- invokeCallback.operationComplete(responseFuture);
- return null;
- }).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any());
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+
future.complete(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
""));
+ doReturn(future).when(remotingClient).invoke(anyString(),
any(RemotingCommand.class), anyLong());
RemotingCommand remotingCommand =
mqClientAPI.sendMessageBackAsync(BROKER_ADDR, new
ConsumerSendMsgBackRequestHeader(), TIMEOUT)
.get();
@@ -285,7 +271,7 @@ public class MQClientAPIExtTest {
body.setConsumerIdList(clientIds);
response.setBody(body.encode());
responseFuture.putResponse(response);
- invokeCallback.operationComplete(responseFuture);
+
invokeCallback.operationSucceed(responseFuture.getResponseCommand());
return null;
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any());
@@ -302,7 +288,7 @@ public class MQClientAPIExtTest {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.makeCustomHeaderToNet();
responseFuture.putResponse(response);
- invokeCallback.operationComplete(responseFuture);
+
invokeCallback.operationSucceed(responseFuture.getResponseCommand());
return null;
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any());
@@ -322,7 +308,7 @@ public class MQClientAPIExtTest {
response.setCode(ResponseCode.SUCCESS);
response.makeCustomHeaderToNet();
responseFuture.putResponse(response);
- invokeCallback.operationComplete(responseFuture);
+
invokeCallback.operationSucceed(responseFuture.getResponseCommand());
return null;
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any());
@@ -335,18 +321,15 @@ public class MQClientAPIExtTest {
@Test
public void testSearchOffsetAsync() throws Exception {
long offset = ThreadLocalRandom.current().nextLong();
- doAnswer((Answer<Void>) mock -> {
- InvokeCallback invokeCallback = mock.getArgument(3);
- ResponseFuture responseFuture = new ResponseFuture(null, 0, 3000,
invokeCallback, null);
- RemotingCommand response =
RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
- SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader) response.readCustomHeader();
- responseHeader.setOffset(offset);
- response.setCode(ResponseCode.SUCCESS);
- response.makeCustomHeaderToNet();
- responseFuture.putResponse(response);
- invokeCallback.operationComplete(responseFuture);
- return null;
- }).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any());
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ RemotingCommand response =
RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
+ SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader) response.readCustomHeader();
+ responseHeader.setOffset(offset);
+ response.setCode(ResponseCode.SUCCESS);
+ response.makeCustomHeaderToNet();
+ future.complete(response);
+
+ doReturn(future).when(remotingClient).invoke(anyString(),
any(RemotingCommand.class), anyLong());
SearchOffsetRequestHeader requestHeader = new
SearchOffsetRequestHeader();
requestHeader.setTopic(TOPIC);
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java
index ce78fa923f..6be4917457 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/InvokeCallback.java
@@ -17,7 +17,22 @@
package org.apache.rocketmq.remoting;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface InvokeCallback {
+ /**
+ * This method is expected to be invoked after {@link
#operationSucceed(RemotingCommand)}
+ * or {@link #operationFail(Throwable)}
+ *
+ * @param responseFuture the returned object contains response or exception
+ */
void operationComplete(final ResponseFuture responseFuture);
+
+ default void operationSucceed(final RemotingCommand response) {
+
+ }
+
+ default void operationFail(final Throwable throwable) {
+
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index ff0b3df95a..c8389eedb1 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -20,11 +20,11 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingClient extends RemotingService {
@@ -51,18 +51,21 @@ public interface RemotingClient extends RemotingService {
final long timeoutMillis) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
- invokeAsync(addr, request, timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
+ invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
future.complete(response);
- } else {
- if (!responseFuture.isSendRequestOK()) {
- future.completeExceptionally(new
RemotingSendRequestException(addr, responseFuture.getCause()));
- } else if (responseFuture.isTimeout()) {
- future.completeExceptionally(new
RemotingTimeoutException(addr, timeoutMillis, responseFuture.getCause()));
- } else {
- future.completeExceptionally(new
RemotingException(request.toString(), responseFuture.getCause()));
- }
+ }
+
+ @Override
+ public void operationFail(Throwable throwable) {
+ future.completeExceptionally(throwable);
}
});
} catch (Throwable t) {
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index fce2de267f..12e66f913c 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -23,20 +23,23 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.opentelemetry.api.common.AttributesBuilder;
-import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.AbortProcessException;
@@ -125,7 +128,7 @@ public abstract class NettyRemotingAbstract {
* Constructor, specifying capacity of one-way and asynchronous semaphores.
*
* @param permitsOneway Number of permits for one-way requests.
- * @param permitsAsync Number of permits for asynchronous requests.
+ * @param permitsAsync Number of permits for asynchronous requests.
*/
public NettyRemotingAbstract(final int permitsOneway, final int
permitsAsync) {
this.semaphoreOneway = new Semaphore(permitsOneway, true);
@@ -367,8 +370,7 @@ public abstract class NettyRemotingAbstract {
responseFuture.release();
}
} else {
- log.warn("receive response, but not matched any request, " +
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- log.warn(cmd.toString());
+ log.warn("receive response, cmd={}, but not matched any request,
address={}", cmd, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
}
@@ -467,57 +469,68 @@ public abstract class NettyRemotingAbstract {
public RemotingCommand invokeSyncImpl(final Channel channel, final
RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException {
- //get the request id
- final int opaque = request.getOpaque();
-
try {
- final ResponseFuture responseFuture = new ResponseFuture(channel,
opaque, timeoutMillis, null, null);
- this.responseTable.put(opaque, responseFuture);
- final SocketAddress addr = channel.remoteAddress();
- channel.writeAndFlush(request).addListener((ChannelFutureListener)
f -> {
- if (f.isSuccess()) {
- responseFuture.setSendRequestOK(true);
- return;
- }
-
- responseFuture.setSendRequestOK(false);
- responseTable.remove(opaque);
- responseFuture.setCause(f.cause());
- responseFuture.putResponse(null);
- log.warn("Failed to write a request command to {}, caused by
underlying I/O operation failure", addr);
- });
+ return invokeImpl(channel, request,
timeoutMillis).thenApply(ResponseFuture::getResponseCommand)
+ .get(timeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new
RemotingSendRequestException(channel.remoteAddress().toString(), e.getCause());
+ } catch (TimeoutException e) {
+ throw new
RemotingTimeoutException(channel.remoteAddress().toString(), timeoutMillis,
e.getCause());
+ }
+ }
- RemotingCommand responseCommand =
responseFuture.waitResponse(timeoutMillis);
- if (null == responseCommand) {
- if (responseFuture.isSendRequestOK()) {
- throw new
RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr),
timeoutMillis,
- responseFuture.getCause());
- } else {
- throw new
RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr),
responseFuture.getCause());
- }
+ public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel,
final RemotingCommand request,
+ final long timeoutMillis) {
+ String channelRemoteAddr =
RemotingHelper.parseChannelRemoteAddr(channel);
+ doBeforeRpcHooks(channelRemoteAddr, request);
+ return invoke0(channel, request, timeoutMillis).whenComplete((v, t) ->
{
+ if (t == null) {
+ doAfterRpcHooks(channelRemoteAddr, request,
v.getResponseCommand());
}
-
- return responseCommand;
- } finally {
- this.responseTable.remove(opaque);
- }
+ });
}
- public void invokeAsyncImpl(final Channel channel, final RemotingCommand
request, final long timeoutMillis,
- final InvokeCallback invokeCallback)
- throws InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
+ protected CompletableFuture<ResponseFuture> invoke0(final Channel channel,
final RemotingCommand request,
+ final long timeoutMillis) {
+ CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();
- boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis,
TimeUnit.MILLISECONDS);
+
+ boolean acquired;
+ try {
+ acquired = this.semaphoreAsync.tryAcquire(timeoutMillis,
TimeUnit.MILLISECONDS);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ return future;
+ }
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new
SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
once.release();
- throw new RemotingTimeoutException("invokeAsyncImpl call
timeout");
+ future.completeExceptionally(new
RemotingTimeoutException("invokeAsyncImpl call timeout"));
+ return future;
}
- final ResponseFuture responseFuture = new ResponseFuture(channel,
opaque, timeoutMillis - costTime, invokeCallback, once);
+ AtomicReference<ResponseFuture> responseFutureReference = new
AtomicReference<>();
+ final ResponseFuture responseFuture = new ResponseFuture(channel,
opaque, request, timeoutMillis - costTime,
+ new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture
responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
+ future.complete(responseFutureReference.get());
+ }
+
+ @Override
+ public void operationFail(Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ }, once);
+ responseFutureReference.set(responseFuture);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
@@ -528,15 +541,17 @@ public abstract class NettyRemotingAbstract {
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.",
RemotingHelper.parseChannelRemoteAddr(channel));
});
+ return future;
} catch (Exception e) {
responseTable.remove(opaque);
responseFuture.release();
log.warn("send a request command to channel <" +
RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
- throw new
RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
+ future.completeExceptionally(new
RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel),
e));
+ return future;
}
} else {
if (timeoutMillis <= 0) {
- throw new RemotingTooMuchRequestException("invokeAsyncImpl
invoke too fast");
+ future.completeExceptionally(new
RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"));
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore
timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
@@ -545,11 +560,31 @@ public abstract class NettyRemotingAbstract {
this.semaphoreAsync.availablePermits()
);
log.warn(info);
- throw new RemotingTimeoutException(info);
+ future.completeExceptionally(new
RemotingTimeoutException(info));
}
+ return future;
}
}
+ public void invokeAsyncImpl(final Channel channel, final RemotingCommand
request, final long timeoutMillis,
+ final InvokeCallback invokeCallback) {
+ invokeImpl(channel, request, timeoutMillis)
+ .whenComplete((v, t) -> {
+ if (t == null) {
+ invokeCallback.operationComplete(v);
+ } else {
+ ResponseFuture responseFuture = new
ResponseFuture(channel, request.getOpaque(), request, timeoutMillis, null,
null);
+ responseFuture.setCause(t);
+ invokeCallback.operationComplete(responseFuture);
+ }
+ })
+ .thenAccept(responseFuture ->
invokeCallback.operationSucceed(responseFuture.getResponseCommand()))
+ .exceptionally(t -> {
+ invokeCallback.operationFail(t);
+ return null;
+ });
+ }
+
private void requestFail(final int opaque) {
ResponseFuture responseFuture = responseTable.remove(opaque);
if (responseFuture != null) {
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 64621dd6c4..d784351a5f 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -527,15 +527,13 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
if (channel != null && channel.isActive()) {
long left = timeoutMillis;
try {
- doBeforeRpcHooks(channelRemoteAddr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
left -= costTime;
if (left <= 0) {
throw new RemotingTimeoutException("invokeSync call the
addr[" + channelRemoteAddr + "] timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel,
request, left);
- doAfterRpcHooks(channelRemoteAddr, request, response);
- this.updateChannelLastResponseTime(addr);
+ updateChannelLastResponseTime(addr);
return response;
} catch (RemotingSendRequestException e) {
LOGGER.warn("invokeSync: send request exception, so close the
channel[{}]", channelRemoteAddr);
@@ -727,18 +725,11 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
final Channel channel = this.getAndCreateChannel(addr);
String channelRemoteAddr =
RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
- try {
- doBeforeRpcHooks(channelRemoteAddr, request);
- long costTime = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTime) {
- throw new RemotingTooMuchRequestException("invokeAsync
call the addr[" + channelRemoteAddr + "] timeout");
- }
- this.invokeAsyncImpl(channel, request, timeoutMillis -
costTime, new InvokeCallbackWrapper(invokeCallback, addr));
- } catch (RemotingSendRequestException e) {
- LOGGER.warn("invokeAsync: send request exception, so close the
channel[{}]", channelRemoteAddr);
- this.closeChannel(addr, channel);
- throw e;
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeoutMillis < costTime) {
+ throw new RemotingTooMuchRequestException("invokeAsync call
the addr[" + channelRemoteAddr + "] timeout");
}
+ this.invokeAsyncImpl(channel, request, timeoutMillis - costTime,
new InvokeCallbackWrapper(invokeCallback, addr));
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
@@ -931,11 +922,19 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
@Override
public void operationComplete(ResponseFuture responseFuture) {
- if (responseFuture != null && responseFuture.isSendRequestOK() &&
responseFuture.getResponseCommand() != null) {
- NettyRemotingClient.this.updateChannelLastResponseTime(addr);
- }
this.invokeCallback.operationComplete(responseFuture);
}
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
+ updateChannelLastResponseTime(addr);
+ this.invokeCallback.operationSucceed(response);
+ }
+
+ @Override
+ public void operationFail(final Throwable throwable) {
+ this.invokeCallback.operationFail(throwable);
+ }
}
class NettyClientHandler extends
SimpleChannelInboundHandler<RemotingCommand> {
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
index 19f705d74b..0882818fea 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
@@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ResponseFuture {
@@ -59,6 +62,18 @@ public class ResponseFuture {
public void executeInvokeCallback() {
if (invokeCallback != null) {
if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
+ RemotingCommand response = getResponseCommand();
+ if (response != null) {
+ invokeCallback.operationSucceed(response);
+ } else {
+ if (!isSendRequestOK()) {
+ invokeCallback.operationFail(new
RemotingSendRequestException(channel.remoteAddress().toString(), getCause()));
+ } else if (isTimeout()) {
+ invokeCallback.operationFail(new
RemotingTimeoutException(channel.remoteAddress().toString(),
getTimeoutMillis(), getCause()));
+ } else {
+ invokeCallback.operationFail(new
RemotingException(getRequestCommand().toString(), getCause()));
+ }
+ }
invokeCallback.operationComplete(this);
}
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java
index 133e0ed314..5328e8845d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/RpcClientImpl.java
@@ -160,31 +160,38 @@ public class RpcClientImpl implements RpcClient {
InvokeCallback callback = new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
- RemotingCommand responseCommand =
responseFuture.getResponseCommand();
- if (responseCommand == null) {
- processFailedResponse(addr, requestCommand,
responseFuture, rpcResponsePromise);
- return;
- }
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
try {
- switch (responseCommand.getCode()) {
+ switch (response.getCode()) {
case ResponseCode.SUCCESS:
case ResponseCode.PULL_NOT_FOUND:
case ResponseCode.PULL_RETRY_IMMEDIATELY:
case ResponseCode.PULL_OFFSET_MOVED:
PullMessageResponseHeader responseHeader =
- (PullMessageResponseHeader)
responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
- rpcResponsePromise.setSuccess(new
RpcResponse(responseCommand.getCode(), responseHeader,
responseCommand.getBody()));
+ (PullMessageResponseHeader)
response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+ rpcResponsePromise.setSuccess(new
RpcResponse(response.getCode(), responseHeader, response.getBody()));
default:
- RpcResponse rpcResponse = new RpcResponse(new
RpcException(responseCommand.getCode(), "unexpected remote response code"));
+ RpcResponse rpcResponse = new RpcResponse(new
RpcException(response.getCode(), "unexpected remote response code"));
rpcResponsePromise.setSuccess(rpcResponse);
}
} catch (Exception e) {
- String errorMessage = "process failed. addr: " + addr + ",
timeoutMillis: " + responseFuture.getTimeoutMillis() + ". Request: " +
requestCommand;
- RpcResponse rpcResponse = new RpcResponse(new
RpcException(ResponseCode.RPC_UNKNOWN, errorMessage, e));
+ String errorMessage = "process failed. addr: " + addr + ",
timeoutMillis: " + timeoutMillis + ". Request: " + requestCommand;
+ RpcResponse rpcResponse = new RpcResponse(new
RpcException(ResponseCode.RPC_UNKNOWN, errorMessage, e));
rpcResponsePromise.setSuccess(rpcResponse);
}
}
+
+ @Override
+ public void operationFail(Throwable throwable) {
+ String errorMessage = "process failed. addr: " + addr + ".
Request: " + requestCommand;
+ RpcResponse rpcResponse = new RpcResponse(new
RpcException(ResponseCode.RPC_UNKNOWN, errorMessage, throwable));
+ rpcResponsePromise.setSuccess(rpcResponse);
+ }
};
this.remotingClient.invokeAsync(addr, requestCommand, timeoutMillis,
callback);
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
index 90072960b5..d0da0eb2ef 100644
---
a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
+++
b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
@@ -26,12 +26,12 @@ import
org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.remoting.netty.ResponseFuture;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.AfterClass;
@@ -40,7 +40,6 @@ import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
public class RemotingServerTest {
private static RemotingServer remotingServer;
@@ -122,10 +121,19 @@ public class RemotingServerTest {
remotingClient.invokeAsync("localhost:" +
remotingServer.localListenPort(), request, 1000 * 3, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
latch.countDown();
- assertTrue(responseFuture != null);
-
assertThat(responseFuture.getResponseCommand().getLanguage()).isEqualTo(LanguageCode.JAVA);
-
assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
+
assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
+ assertThat(response.getExtFields()).hasSize(2);
+ }
+
+ @Override
+ public void operationFail(Throwable throwable) {
+
}
});
latch.await();
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannel.java
similarity index 57%
rename from
client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java
rename to
remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannel.java
index 80188832eb..8ddcdf35df 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannel.java
@@ -15,23 +15,14 @@
* limitations under the License.
*/
-package org.apache.rocketmq.client.impl;
+package org.apache.rocketmq.remoting.netty;
-import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.netty.ResponseFuture;
-
-public abstract class BaseInvokeCallback implements InvokeCallback {
- private final MQClientAPIImpl mqClientAPI;
-
- public BaseInvokeCallback(MQClientAPIImpl mqClientAPI) {
- this.mqClientAPI = mqClientAPI;
- }
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.local.LocalChannel;
+public class MockChannel extends LocalChannel {
@Override
- public void operationComplete(final ResponseFuture responseFuture) {
- mqClientAPI.execRpcHooksAfterRequest(responseFuture);
- onComplete(responseFuture);
+ public ChannelFuture writeAndFlush(Object msg) {
+ return new MockChannelPromise(MockChannel.this);
}
-
- public abstract void onComplete(final ResponseFuture responseFuture);
}
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannelPromise.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannelPromise.java
new file mode 100644
index 0000000000..9c3a354871
--- /dev/null
+++
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/MockChannelPromise.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.jetbrains.annotations.NotNull;
+
+public class MockChannelPromise implements ChannelPromise {
+ protected Channel channel;
+
+ public MockChannelPromise(Channel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public Channel channel() {
+ return channel;
+ }
+
+ @Override
+ public ChannelPromise setSuccess(Void result) {
+ return this;
+ }
+
+ @Override
+ public ChannelPromise setSuccess() {
+ return this;
+ }
+
+ @Override
+ public boolean trySuccess() {
+ return false;
+ }
+
+ @Override
+ public ChannelPromise setFailure(Throwable cause) {
+ return this;
+ }
+
+ @Override
+ public ChannelPromise addListener(GenericFutureListener<? extends Future<?
super Void>> listener) {
+ return this;
+ }
+
+ @Override
+ public ChannelPromise addListeners(GenericFutureListener<? extends
Future<? super Void>>... listeners) {
+ return this;
+ }
+
+ @Override
+ public ChannelPromise removeListener(GenericFutureListener<? extends
Future<? super Void>> listener) {
+ return this;
+ }
+
+ @Override
+ public ChannelPromise removeListeners(GenericFutureListener<? extends
Future<? super Void>>... listeners) {
+ return this;
+ }
+
+ @Override
+ public ChannelPromise sync() throws InterruptedException {
+ return this;
+ }
+
+ @Override
+ public ChannelPromise syncUninterruptibly() {
+ return this;
+ }
+
+ @Override
+ public ChannelPromise await() throws InterruptedException {
+ return this;
+ }
+
+ @Override
+ public ChannelPromise awaitUninterruptibly() {
+ return this;
+ }
+
+ @Override
+ public ChannelPromise unvoid() {
+ return this;
+ }
+
+ @Override
+ public boolean isVoid() {
+ return false;
+ }
+
+ @Override
+ public boolean trySuccess(Void result) {
+ return false;
+ }
+
+ @Override
+ public boolean tryFailure(Throwable cause) {
+ return false;
+ }
+
+ @Override
+ public boolean setUncancellable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSuccess() {
+ return false;
+ }
+
+ @Override
+ public boolean isCancellable() {
+ return false;
+ }
+
+ @Override
+ public Throwable cause() {
+ return null;
+ }
+
+ @Override
+ public boolean await(long timeout, TimeUnit unit) throws
InterruptedException {
+ return false;
+ }
+
+ @Override
+ public boolean await(long timeoutMillis) throws InterruptedException {
+ return false;
+ }
+
+ @Override
+ public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
+ return false;
+ }
+
+ @Override
+ public boolean awaitUninterruptibly(long timeoutMillis) {
+ return false;
+ }
+
+ @Override
+ public Void getNow() {
+ return null;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return false;
+ }
+
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ return null;
+ }
+
+ @Override
+ public Void get(long timeout,
+ @NotNull java.util.concurrent.TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException {
+ return null;
+ }
+}
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
index 8381c132b7..dbbea86ea2 100644
---
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
+++
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
@@ -39,9 +39,19 @@ public class NettyRemotingAbstractTest {
final Semaphore semaphore = new Semaphore(0);
ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, new
InvokeCallback() {
@Override
- public void operationComplete(final ResponseFuture responseFuture)
{
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
assertThat(semaphore.availablePermits()).isEqualTo(0);
}
+
+ @Override
+ public void operationFail(Throwable throwable) {
+
+ }
}, new SemaphoreReleaseOnlyOnce(semaphore));
remotingAbstract.responseTable.putIfAbsent(1, responseFuture);
@@ -75,9 +85,19 @@ public class NettyRemotingAbstractTest {
final Semaphore semaphore = new Semaphore(0);
ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, new
InvokeCallback() {
@Override
- public void operationComplete(final ResponseFuture responseFuture)
{
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
assertThat(semaphore.availablePermits()).isEqualTo(0);
}
+
+ @Override
+ public void operationFail(Throwable throwable) {
+
+ }
}, new SemaphoreReleaseOnlyOnce(semaphore));
remotingAbstract.responseTable.putIfAbsent(1, responseFuture);
@@ -98,7 +118,18 @@ public class NettyRemotingAbstractTest {
// mock timeout
ResponseFuture responseFuture = new ResponseFuture(null, dummyId,
-1000, new InvokeCallback() {
@Override
- public void operationComplete(final ResponseFuture responseFuture)
{
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
+
+ }
+
+ @Override
+ public void operationFail(Throwable throwable) {
+
}
}, null);
remotingAbstract.responseTable.putIfAbsent(dummyId, responseFuture);
@@ -111,7 +142,22 @@ public class NettyRemotingAbstractTest {
final Semaphore semaphore = new Semaphore(0);
RemotingCommand request = RemotingCommand.createRequestCommand(1,
null);
ResponseFuture responseFuture = new ResponseFuture(null, 1, request,
3000,
- responseFuture1 ->
assertThat(semaphore.availablePermits()).isEqualTo(0), new
SemaphoreReleaseOnlyOnce(semaphore));
+ new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+
+ @Override
+ public void operationSucceed(RemotingCommand response) {
+ assertThat(semaphore.availablePermits()).isEqualTo(0);
+ }
+
+ @Override
+ public void operationFail(Throwable throwable) {
+
+ }
+ }, new SemaphoreReleaseOnlyOnce(semaphore));
remotingAbstract.responseTable.putIfAbsent(1, responseFuture);
RemotingCommand response = RemotingCommand.createResponseCommand(0,
"Foo");
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
index 8fabbb21d0..e72e7bd53e 100644
---
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
+++
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
@@ -16,10 +16,17 @@
*/
package org.apache.rocketmq.remoting.netty;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.local.LocalChannel;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -29,23 +36,33 @@ import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class NettyRemotingClientTest {
@Spy
private NettyRemotingClient remotingClient = new NettyRemotingClient(new
NettyClientConfig());
+ @Mock
+ private RPCHook rpcHookMock;
@Test
- public void testSetCallbackExecutor() throws NoSuchFieldException,
IllegalAccessException {
+ public void testSetCallbackExecutor() {
ExecutorService customized = Executors.newCachedThreadPool();
remotingClient.setCallbackExecutor(customized);
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
@@ -61,7 +78,7 @@ public class NettyRemotingClientTest {
InvokeCallback callback = invocation.getArgument(3);
ResponseFuture responseFuture = new ResponseFuture(null,
request.getOpaque(), 3 * 1000, null, null);
responseFuture.setResponseCommand(response);
- callback.operationComplete(responseFuture);
+ callback.operationSucceed(responseFuture.getResponseCommand());
return null;
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
@@ -78,9 +95,7 @@ public class NettyRemotingClientTest {
response.setCode(ResponseCode.SUCCESS);
doAnswer(invocation -> {
InvokeCallback callback = invocation.getArgument(3);
- ResponseFuture responseFuture = new ResponseFuture(null,
request.getOpaque(), 3 * 1000, null, null);
- responseFuture.setSendRequestOK(false);
- callback.operationComplete(responseFuture);
+ callback.operationFail(new RemotingSendRequestException(null));
return null;
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
@@ -97,8 +112,7 @@ public class NettyRemotingClientTest {
response.setCode(ResponseCode.SUCCESS);
doAnswer(invocation -> {
InvokeCallback callback = invocation.getArgument(3);
- ResponseFuture responseFuture = new ResponseFuture(null,
request.getOpaque(), -1L, null, null);
- callback.operationComplete(responseFuture);
+ callback.operationFail(new RemotingTimeoutException(""));
return null;
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
@@ -115,8 +129,7 @@ public class NettyRemotingClientTest {
response.setCode(ResponseCode.SUCCESS);
doAnswer(invocation -> {
InvokeCallback callback = invocation.getArgument(3);
- ResponseFuture responseFuture = new ResponseFuture(null,
request.getOpaque(), 3 * 1000, null, null);
- callback.operationComplete(responseFuture);
+ callback.operationFail(new RemotingException(null));
return null;
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
@@ -134,4 +147,158 @@ public class NettyRemotingClientTest {
assertThat(e.getMessage()).contains(addr);
}
}
+
+ @Test
+ public void testInvoke0() throws ExecutionException, InterruptedException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ Channel channel = new MockChannel() {
+ @Override
+ public ChannelFuture writeAndFlush(Object msg) {
+ ResponseFuture responseFuture =
remotingClient.responseTable.get(request.getOpaque());
+ responseFuture.setResponseCommand(response);
+ responseFuture.executeInvokeCallback();
+ return super.writeAndFlush(msg);
+ }
+ };
+ CompletableFuture<ResponseFuture> future =
remotingClient.invoke0(channel, request, 1000L);
+ assertThat(future.get().getResponseCommand()).isEqualTo(response);
+ }
+
+ @Test
+ public void testInvoke0WithException() {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ Channel channel = new MockChannel() {
+ @Override
+ public ChannelFuture writeAndFlush(Object msg) {
+ ResponseFuture responseFuture =
remotingClient.responseTable.get(request.getOpaque());
+ responseFuture.executeInvokeCallback();
+ return super.writeAndFlush(msg);
+ }
+ };
+ CompletableFuture<ResponseFuture> future =
remotingClient.invoke0(channel, request, 1000L);
+
assertThatThrownBy(future::get).getCause().isInstanceOf(RemotingException.class);
+ }
+
+ @Test
+ public void testInvokeSync() throws RemotingSendRequestException,
RemotingTimeoutException, InterruptedException {
+ remotingClient.registerRPCHook(rpcHookMock);
+
+ Channel channel = new LocalChannel();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ ResponseFuture responseFuture = new ResponseFuture(channel,
request.getOpaque(), request, 1000, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+ }, new SemaphoreReleaseOnlyOnce(new Semaphore(1)));
+ responseFuture.setResponseCommand(response);
+ CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+ future.complete(responseFuture);
+
+ doReturn(future).when(remotingClient).invoke0(any(Channel.class),
any(RemotingCommand.class), anyLong());
+ RemotingCommand actual = remotingClient.invokeSyncImpl(channel,
request, 1000);
+ assertThat(actual).isEqualTo(response);
+
+ verify(rpcHookMock).doBeforeRequest(anyString(), eq(request));
+ verify(rpcHookMock).doAfterResponse(anyString(), eq(request),
eq(response));
+ }
+
+ @Test
+ public void testInvokeAsync() {
+ remotingClient.registerRPCHook(rpcHookMock);
+ Channel channel = new LocalChannel();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ ResponseFuture responseFuture = new ResponseFuture(channel,
request.getOpaque(), request, 1000, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+ }, new SemaphoreReleaseOnlyOnce(new Semaphore(1)));
+ responseFuture.setResponseCommand(response);
+ CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+ future.complete(responseFuture);
+
+ doReturn(future).when(remotingClient).invoke0(any(Channel.class),
any(RemotingCommand.class), anyLong());
+
+ InvokeCallback callback = mock(InvokeCallback.class);
+ remotingClient.invokeAsyncImpl(channel, request, 1000, callback);
+ verify(callback, times(1)).operationSucceed(eq(response));
+ verify(callback, times(1)).operationComplete(eq(responseFuture));
+ verify(callback, never()).operationFail(any());
+
+ verify(rpcHookMock).doBeforeRequest(anyString(), eq(request));
+ verify(rpcHookMock).doAfterResponse(anyString(), eq(request),
eq(response));
+ }
+
+ @Test
+ public void testInvokeAsyncFail() {
+ remotingClient.registerRPCHook(rpcHookMock);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+
+ Channel channel = new LocalChannel();
+ CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+ future.completeExceptionally(new RemotingException(null));
+
+ doReturn(future).when(remotingClient).invoke0(any(Channel.class),
any(RemotingCommand.class), anyLong());
+
+ InvokeCallback callback = mock(InvokeCallback.class);
+ remotingClient.invokeAsyncImpl(channel, request, 1000, callback);
+ verify(callback, never()).operationSucceed(any());
+ verify(callback, times(1)).operationComplete(any());
+ verify(callback, times(1)).operationFail(any());
+
+ verify(rpcHookMock).doBeforeRequest(anyString(), eq(request));
+ verify(rpcHookMock, never()).doAfterResponse(anyString(), eq(request),
any());
+ }
+
+ @Test
+ public void testInvokeImpl() throws ExecutionException,
InterruptedException {
+ remotingClient.registerRPCHook(rpcHookMock);
+ Channel channel = new LocalChannel();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ ResponseFuture responseFuture = new ResponseFuture(channel,
request.getOpaque(), request, 1000, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+
+ }
+ }, new SemaphoreReleaseOnlyOnce(new Semaphore(1)));
+ responseFuture.setResponseCommand(response);
+ CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+ future.complete(responseFuture);
+
+ doReturn(future).when(remotingClient).invoke0(any(Channel.class),
any(RemotingCommand.class), anyLong());
+
+ CompletableFuture<ResponseFuture> future0 =
remotingClient.invokeImpl(channel, request, 1000);
+ assertThat(future0.get()).isEqualTo(responseFuture);
+
+ verify(rpcHookMock).doBeforeRequest(anyString(), eq(request));
+ verify(rpcHookMock).doAfterResponse(anyString(), eq(request),
eq(response));
+ }
+
+ @Test
+ public void testInvokeImplFail() {
+ remotingClient.registerRPCHook(rpcHookMock);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+
+ Channel channel = new LocalChannel();
+ CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
+ future.completeExceptionally(new RemotingException(null));
+
+ doReturn(future).when(remotingClient).invoke0(any(Channel.class),
any(RemotingCommand.class), anyLong());
+
+ assertThatThrownBy(() -> remotingClient.invokeImpl(channel, request,
1000).get()).getCause().isInstanceOf(RemotingException.class);
+
+ verify(rpcHookMock).doBeforeRequest(anyString(), eq(request));
+ verify(rpcHookMock, never()).doAfterResponse(anyString(), eq(request),
any());
+ }
}