shengminw commented on code in PR #4553:
URL: https://github.com/apache/rocketmq/pull/4553#discussion_r913442814
##########
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java:
##########
@@ -1047,37 +1052,42 @@ public void send(final Message msg, final MessageQueue
mq, final SendCallback se
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor();
- try {
- executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- makeSureStateOK();
- Validators.checkMessage(msg, defaultMQProducer);
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ makeSureStateOK();
+ Validators.checkMessage(msg, defaultMQProducer);
- if (!msg.getTopic().equals(mq.getTopic())) {
- throw new MQClientException("message's topic not
equal mq's topic", null);
- }
- long costTime = System.currentTimeMillis() -
beginStartTime;
- if (timeout > costTime) {
- try {
- sendKernelImpl(msg, mq,
CommunicationMode.ASYNC, sendCallback, null,
+ if (!msg.getTopic().equals(mq.getTopic())) {
+ throw new MQClientException("message's topic not equal
mq's topic", null);
+ }
+ long costTime = System.currentTimeMillis() -
beginStartTime;
+ if (timeout > costTime) {
+ try {
+ sendKernelImpl(msg, mq, CommunicationMode.ASYNC,
sendCallback, null,
timeout - costTime);
- } catch (MQBrokerException e) {
- throw new MQClientException("unknown
exception", e);
- }
- } else {
- sendCallback.onException(new
RemotingTooMuchRequestException("call timeout"));
+ } catch (MQBrokerException e) {
+ throw new MQClientException("unknown exception",
e);
}
- } catch (Exception e) {
- sendCallback.onException(e);
+ } else {
+ sendCallback.onException(new
RemotingTooMuchRequestException("call timeout"));
}
-
+ } catch (Exception e) {
+ sendCallback.onException(e);
}
- });
- } catch (RejectedExecutionException e) {
- throw new MQClientException("executor rejected ", e);
+ }
+ };
+
+ if (this.defaultMQProducer.isEnableBackpressureForAsyncMode() &&
this.asyncSenderExecutor == null) {
+ runnable.run();
+ } else {
+ try {
+ executor.submit(runnable);
+ } catch (RejectedExecutionException e) {
+ throw new MQClientException("executor rejected ", e);
+ }
}
Review Comment:
@lizhiboo Well, time cost of message sending is mainly in the IO operation.
So the IO operation remains in an asynchronous mode the sending process can
still be regarded as an asynchronous process.
As for your second question, IMO, from the users' view, when they revieve
the "RejectedExecutionException", they need to solve this problem by
themselves, always just sleep the thread for a while. However, this may not be
a good solution. We can implement the backpressure in mq better than user
implementing by themselves, just throw the exception when "time callout".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]