lwclover opened a new issue #3556:
URL: https://github.com/apache/rocketmq/issues/3556


   RocketMQ client updates topic route infomations every 30 seconds,
   When broker is down,Rocketmq client can not connect to the broker. Sending 
async message can not retry,Throwing a RemotingConnectException.
   
   The RemotingConnectException below:
   ```
   org.apache.rocketmq.client.exception.MQClientException: Send [1] times, 
still failed, cost [5]ms, Topic: SSP_VC_COMMAND_CFG_PULL_PUSH_RESPOND_RECORD, 
BrokersSent: [broker-ssp-11]
   See http://rocketmq.apache.org/docs/faq/ for further details.
        at 
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:610)
        at 
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.access$300(DefaultMQProducerImpl.java:87)
        at 
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl$2.run(DefaultMQProducerImpl.java:467)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: 
connect to <x.x.x.x10911> failed
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:540)
        at 
org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:375)
        at 
org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage$original$hWukCcmf(MQClientAPIImpl.java:332)
        at 
org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage$original$hWukCcmf$accessor$wRKVBfSV(MQClientAPIImpl.java)
        at 
org.apache.rocketmq.client.impl.MQClientAPIImpl$auxiliary$5onkBFUr.call(Unknown 
Source)
        at 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86)
        at 
org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java)
        at 
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:765)
        at 
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:529)
        ... 7 common frames omitted
   ```
   
   The RemotingConnectException caused by:  throw new 
RemotingConnectException(addr);
   ```
       @Override
       public void invokeAsync(String addr, RemotingCommand request, long 
timeoutMillis, InvokeCallback invokeCallback)
           throws InterruptedException, RemotingConnectException, 
RemotingTooMuchRequestException, RemotingTimeoutException,
           RemotingSendRequestException {
           long beginStartTime = System.currentTimeMillis();
           final Channel channel = this.getAndCreateChannel(addr);
           if (channel != null && channel.isActive()) {
               try {
                   doBeforeRpcHooks(addr, request);
                   long costTime = System.currentTimeMillis() - beginStartTime;
                   if (timeoutMillis < costTime) {
                       throw new RemotingTooMuchRequestException("invokeAsync 
call timeout");
                   }
                   this.invokeAsyncImpl(channel, request, timeoutMillis - 
costTime, invokeCallback);
               } catch (RemotingSendRequestException e) {
                   log.warn("invokeAsync: send request exception, so close the 
channel[{}]", addr);
                   this.closeChannel(addr, channel);
                   throw e;
               }
           } else {
               this.closeChannel(addr, channel);
               throw new RemotingConnectException(addr);
           }
       }
   ```
    CommunicationMode.Async retry times is 1, when throws 
RemotingConnectException can not retry.
   ```
   private SendResult sendDefaultImpl(
           Message msg,
           final CommunicationMode communicationMode,
           final SendCallback sendCallback,
           final long timeout
       ) throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
           this.makeSureStateOK();
           Validators.checkMessage(msg, this.defaultMQProducer);
           final long invokeID = random.nextLong();
           long beginTimestampFirst = System.currentTimeMillis();
           long beginTimestampPrev = beginTimestampFirst;
           long endTimestamp = beginTimestampFirst;
           TopicPublishInfo topicPublishInfo = 
this.tryToFindTopicPublishInfo(msg.getTopic());
           if (topicPublishInfo != null && topicPublishInfo.ok()) {
               boolean callTimeout = false;
               MessageQueue mq = null;
               Exception exception = null;
               SendResult sendResult = null;
               int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 
+ this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
               int times = 0;
               String[] brokersSent = new String[timesTotal];
               for (; times < timesTotal; times++) {
                   String lastBrokerName = null == mq ? null : 
mq.getBrokerName();
                   MessageQueue mqSelected = 
this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                   if (mqSelected != null) {
                       mq = mqSelected;
                       brokersSent[times] = mq.getBrokerName();
                       try {
                           beginTimestampPrev = System.currentTimeMillis();
                           if (times > 0) {
                               //Reset topic with namespace during resend.
                               
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                           }
                           long costTime = beginTimestampPrev - 
beginTimestampFirst;
                           if (timeout < costTime) {
                               callTimeout = true;
                               break;
                           }
   
                           sendResult = this.sendKernelImpl(msg, mq, 
communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                           endTimestamp = System.currentTimeMillis();
                           this.updateFaultItem(mq.getBrokerName(), 
endTimestamp - beginTimestampPrev, false);
                           switch (communicationMode) {
                               case ASYNC:
                                   return null;
                               case ONEWAY:
                                   return null;
                               case SYNC:
                                   if (sendResult.getSendStatus() != 
SendStatus.SEND_OK) {
                                       if 
(this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                           continue;
                                       }
                                   }
   
                                   return sendResult;
                               default:
                                   break;
                           }
                       } catch (RemotingException | MQClientException e) {
                           endTimestamp = System.currentTimeMillis();
                           this.updateFaultItem(mq.getBrokerName(), 
endTimestamp - beginTimestampPrev, true);
                           log.warn(String.format("sendKernelImpl exception, 
resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - 
beginTimestampPrev, mq), e);
                           log.warn(msg.toString());
                           exception = e;
                           continue;
                       } catch (MQBrokerException e) {
                           endTimestamp = System.currentTimeMillis();
                           this.updateFaultItem(mq.getBrokerName(), 
endTimestamp - beginTimestampPrev, true);
                           log.warn(String.format("sendKernelImpl exception, 
resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - 
beginTimestampPrev, mq), e);
                           log.warn(msg.toString());
                           exception = e;
                           if 
(this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
                               continue;
                           } else {
                               if (sendResult != null) {
                                   return sendResult;
                               }
   
                               throw e;
                           }
                       } catch (InterruptedException e) {
                           endTimestamp = System.currentTimeMillis();
                           this.updateFaultItem(mq.getBrokerName(), 
endTimestamp - beginTimestampPrev, false);
                           log.warn(String.format("sendKernelImpl exception, 
throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - 
beginTimestampPrev, mq), e);
                           log.warn(msg.toString());
   
                           log.warn("sendKernelImpl exception", e);
                           log.warn(msg.toString());
                           throw e;
                       }
                   } else {
                       break;
                   }
               }
   
               if (sendResult != null) {
                   return sendResult;
               }
   
               String info = String.format("Send [%d] times, still failed, cost 
[%d]ms, Topic: %s, BrokersSent: %s",
                   times,
                   System.currentTimeMillis() - beginTimestampFirst,
                   msg.getTopic(),
                   Arrays.toString(brokersSent));
   
               info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
   
               MQClientException mqClientException = new 
MQClientException(info, exception);
               if (callTimeout) {
                   throw new RemotingTooMuchRequestException("sendDefaultImpl 
call timeout");
               }
   
               if (exception instanceof MQBrokerException) {
                   mqClientException.setResponseCode(((MQBrokerException) 
exception).getResponseCode());
               } else if (exception instanceof RemotingConnectException) {
                   
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
               } else if (exception instanceof RemotingTimeoutException) {
                   
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
               } else if (exception instanceof MQClientException) {
                   
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
               }
   
               throw mqClientException;
           }
   
           validateNameServerSetting();
   
           throw new MQClientException("No route info of this topic: " + 
msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
               null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to