shengminw commented on code in PR #4553:
URL: https://github.com/apache/rocketmq/pull/4553#discussion_r913442814


##########
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java:
##########
@@ -1047,37 +1052,42 @@ public void send(final Message msg, final MessageQueue 
mq, final SendCallback se
         throws MQClientException, RemotingException, InterruptedException {
         final long beginStartTime = System.currentTimeMillis();
         ExecutorService executor = this.getAsyncSenderExecutor();
-        try {
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        makeSureStateOK();
-                        Validators.checkMessage(msg, defaultMQProducer);
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    makeSureStateOK();
+                    Validators.checkMessage(msg, defaultMQProducer);
 
-                        if (!msg.getTopic().equals(mq.getTopic())) {
-                            throw new MQClientException("message's topic not 
equal mq's topic", null);
-                        }
-                        long costTime = System.currentTimeMillis() - 
beginStartTime;
-                        if (timeout > costTime) {
-                            try {
-                                sendKernelImpl(msg, mq, 
CommunicationMode.ASYNC, sendCallback, null,
+                    if (!msg.getTopic().equals(mq.getTopic())) {
+                        throw new MQClientException("message's topic not equal 
mq's topic", null);
+                    }
+                    long costTime = System.currentTimeMillis() - 
beginStartTime;
+                    if (timeout > costTime) {
+                        try {
+                            sendKernelImpl(msg, mq, CommunicationMode.ASYNC, 
sendCallback, null,
                                     timeout - costTime);
-                            } catch (MQBrokerException e) {
-                                throw new MQClientException("unknown 
exception", e);
-                            }
-                        } else {
-                            sendCallback.onException(new 
RemotingTooMuchRequestException("call timeout"));
+                        } catch (MQBrokerException e) {
+                            throw new MQClientException("unknown exception", 
e);
                         }
-                    } catch (Exception e) {
-                        sendCallback.onException(e);
+                    } else {
+                        sendCallback.onException(new 
RemotingTooMuchRequestException("call timeout"));
                     }
-
+                } catch (Exception e) {
+                    sendCallback.onException(e);
                 }
 
-            });
-        } catch (RejectedExecutionException e) {
-            throw new MQClientException("executor rejected ", e);
+            }
+        };
+
+        if (this.defaultMQProducer.isEnableBackpressureForAsyncMode()  && 
this.asyncSenderExecutor == null) {
+            runnable.run();
+        } else {
+            try {
+                executor.submit(runnable);
+            } catch (RejectedExecutionException e) {
+                throw new MQClientException("executor rejected ", e);
+            }
         }

Review Comment:
   @lizhiboo Well, time cost of message sending is mainly in the IO operation. 
So the IO operation remains in an asynchronous mode the sending process can 
still be regarded as an asynchronous process.
   As for your second question, from the users' view, when they revieve the 
"RejectedExecutionException", they need to solve this problem by themselves, 
always just sleep the thread for a while. However, this may not be a good 
solution. We can implement the backpressure in mq better than user implementing 
by themselves, just throw the exception when "time callout". 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to