fingerLr opened a new issue, #1139:
URL: https://github.com/apache/rocketmq-clients/issues/1139

   ### Before Creating the Bug Report
   
   - [x] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions).
   
   - [x] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq-clients/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq-clients/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [x] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Programming Language of the Client
   
   Java
   
   ### Runtime Platform Environment
   
   windows11
   
   ### RocketMQ Version of the Client/Server
   
   5.0.2
   
   ### Run or Compiler Version
   
   JDK17
   
   ### Describe the Bug
   
   ProducerImpl#
   private void send0(SettableFuture<List<SendReceiptImpl>> future0, String 
topic, MessageType messageType,
           final List<MessageQueueImpl> candidates, final 
List<PublishingMessageImpl> messages, final int attempt) {
           // Calculate the current message queue.
           final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1, 
candidates.size()));
           final List<MessageType> acceptMessageTypes = 
mq.getAcceptMessageTypes();
           if (publishingSettings.isValidateMessageType() && 
!acceptMessageTypes.contains(messageType)) {
               final IllegalArgumentException e = new 
IllegalArgumentException("Current message type not match with "
                   + "topic accept message types, topic=" + topic + ", 
actualMessageType=" + messageType + ", "
                   + "acceptMessageTypes=" + acceptMessageTypes);
               future0.setException(e);
               return;
           }
           final Endpoints endpoints = mq.getBroker().getEndpoints();
           final ListenableFuture<List<SendReceiptImpl>> future = 
send0(endpoints, messages, mq);
           final int maxAttempts = this.getRetryPolicy().getMaxAttempts();
   
           // Intercept before message publishing.
           final List<GeneralMessage> generalMessages = 
messages.stream().map((Function<PublishingMessageImpl,
               GeneralMessage>) 
GeneralMessageImpl::new).collect(Collectors.toList());
           final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.SEND);
           doBefore(context, generalMessages);
   
           Futures.addCallback(future, new 
FutureCallback<List<SendReceiptImpl>>() {
               @Override
               public void onSuccess(List<SendReceiptImpl> sendReceipts) {
                   // Should never reach here.
                   if (sendReceipts.size() != messages.size()) {
                       final InternalErrorException e = new 
InternalErrorException("[Bug] due to an"
                           + " unknown reason from remote, received send 
receipt's quantity " + sendReceipts.size()
                           + " is not equal to sent message's quantity " + 
messages.size());
                       future0.setException(e);
   
                       // Intercept after message publishing.
                       final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                           MessageHookPointsStatus.ERROR);
                       doAfter(context0, generalMessages);
   
                       return;
                   }
                   // Intercept after message publishing.
                   final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                       MessageHookPointsStatus.OK);
                   doAfter(context0, generalMessages);
   
                   // No need more attempts.
                   future0.set(sendReceipts);
                   // Resend message(s) successfully.
                   if (1 < attempt) {
                       // Collect messageId(s) for logging.
                       List<MessageId> messageIds = new ArrayList<>();
                       for (SendReceipt receipt : sendReceipts) {
                           messageIds.add(receipt.getMessageId());
                       }
                       LOGGER.info("Resend message successfully, topic={}, 
messageId(s)={}, maxAttempts={}, "
                               + "attempt={}, endpoints={}, clientId={}", 
topic, messageIds, maxAttempts, attempt,
                           endpoints, clientId);
                   }
                   // Send message(s) successfully on first attempt, return 
directly.
               }
   
               @Override
               public void onFailure(Throwable t) {
                   // Intercept after message publishing.
                   final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                       MessageHookPointsStatus.ERROR);
                   doAfter(context0, generalMessages);
   
                   // Collect messageId(s) for logging.
                   List<MessageId> messageIds = new ArrayList<>();
                   for (PublishingMessageImpl message : messages) {
                       messageIds.add(message.getMessageId());
                   }
                   // Isolate endpoints because of sending failure.
                   isolate(endpoints);
                   if (attempt >= maxAttempts) {
                       // No need more attempts.
                       future0.setException(t);
                       LOGGER.error("Failed to send message(s) finally, run out 
of attempt times, maxAttempts={}, " +
                               "attempt={}, topic={}, messageId(s)={}, 
endpoints={}, clientId={}",
                           maxAttempts, attempt, topic, messageIds, endpoints, 
clientId, t);
                       return;
                   }
                   // No need more attempts for transactional message.
                   if (MessageType.TRANSACTION.equals(messageType)) {
                       future0.setException(t);
                       LOGGER.error("Failed to send transactional message 
finally, maxAttempts=1, attempt={}, " +
                               "topic={}, messageId(s)={}, endpoints={}, 
clientId={}", attempt, topic, messageIds,
                           endpoints, clientId, t);
                       return;
                   }
                   // Try to do more attempts.
                   int nextAttempt = 1 + attempt;
                   // Retry immediately if the request is not throttled.
                   if (!(t instanceof TooManyRequestsException)) {
                       LOGGER.warn("Failed to send message, would attempt to 
resend right now, maxAttempts={}, "
                               + "attempt={}, topic={}, messageId(s)={}, 
endpoints={}, clientId={}", maxAttempts, attempt,
                           topic, messageIds, endpoints, clientId, t);
                       send0(future0, topic, messageType, candidates, messages, 
nextAttempt);
                       return;
                   }
                   final Duration delay = 
ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
                   LOGGER.warn("Failed to send message due to too many 
requests, would attempt to resend after {}, "
                           + "maxAttempts={}, attempt={}, topic={}, 
messageId(s)={}, endpoints={}, clientId={}", delay,
                       maxAttempts, attempt, topic, messageIds, endpoints, 
clientId, t);
                   
ProducerImpl.this.getClientManager().getScheduler().schedule(() -> 
send0(future0, topic, messageType,
                       candidates, messages, nextAttempt), delay.toNanos(), 
TimeUnit.NANOSECONDS);
               }
           }, clientCallbackExecutor);
       }
   
   这里的before是不是存在调用时机错误? 感觉像是没注意。  ProducerImpl#
   private void send0(SettableFuture<List<SendReceiptImpl>> future0, String 
topic, MessageType messageType,
           final List<MessageQueueImpl> candidates, final 
List<PublishingMessageImpl> messages, final int attempt) {
           // Calculate the current message queue.
           final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1, 
candidates.size()));
           final List<MessageType> acceptMessageTypes = 
mq.getAcceptMessageTypes();
           if (publishingSettings.isValidateMessageType() && 
!acceptMessageTypes.contains(messageType)) {
               final IllegalArgumentException e = new 
IllegalArgumentException("Current message type not match with "
                   + "topic accept message types, topic=" + topic + ", 
actualMessageType=" + messageType + ", "
                   + "acceptMessageTypes=" + acceptMessageTypes);
               future0.setException(e);
               return;
           }
           final Endpoints endpoints = mq.getBroker().getEndpoints();
           final ListenableFuture<List<SendReceiptImpl>> future = 
send0(endpoints, messages, mq);
           final int maxAttempts = this.getRetryPolicy().getMaxAttempts();
   
           // Intercept before message publishing.
           final List<GeneralMessage> generalMessages = 
messages.stream().map((Function<PublishingMessageImpl,
               GeneralMessage>) 
GeneralMessageImpl::new).collect(Collectors.toList());
           final MessageInterceptorContextImpl context = new 
MessageInterceptorContextImpl(MessageHookPoints.SEND);
           doBefore(context, generalMessages);
   
           Futures.addCallback(future, new 
FutureCallback<List<SendReceiptImpl>>() {
               @Override
               public void onSuccess(List<SendReceiptImpl> sendReceipts) {
                   // Should never reach here.
                   if (sendReceipts.size() != messages.size()) {
                       final InternalErrorException e = new 
InternalErrorException("[Bug] due to an"
                           + " unknown reason from remote, received send 
receipt's quantity " + sendReceipts.size()
                           + " is not equal to sent message's quantity " + 
messages.size());
                       future0.setException(e);
   
                       // Intercept after message publishing.
                       final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                           MessageHookPointsStatus.ERROR);
                       doAfter(context0, generalMessages);
   
                       return;
                   }
                   // Intercept after message publishing.
                   final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                       MessageHookPointsStatus.OK);
                   doAfter(context0, generalMessages);
   
                   // No need more attempts.
                   future0.set(sendReceipts);
                   // Resend message(s) successfully.
                   if (1 < attempt) {
                       // Collect messageId(s) for logging.
                       List<MessageId> messageIds = new ArrayList<>();
                       for (SendReceipt receipt : sendReceipts) {
                           messageIds.add(receipt.getMessageId());
                       }
                       LOGGER.info("Resend message successfully, topic={}, 
messageId(s)={}, maxAttempts={}, "
                               + "attempt={}, endpoints={}, clientId={}", 
topic, messageIds, maxAttempts, attempt,
                           endpoints, clientId);
                   }
                   // Send message(s) successfully on first attempt, return 
directly.
               }
   
               @Override
               public void onFailure(Throwable t) {
                   // Intercept after message publishing.
                   final MessageInterceptorContextImpl context0 = new 
MessageInterceptorContextImpl(context,
                       MessageHookPointsStatus.ERROR);
                   doAfter(context0, generalMessages);
   
                   // Collect messageId(s) for logging.
                   List<MessageId> messageIds = new ArrayList<>();
                   for (PublishingMessageImpl message : messages) {
                       messageIds.add(message.getMessageId());
                   }
                   // Isolate endpoints because of sending failure.
                   isolate(endpoints);
                   if (attempt >= maxAttempts) {
                       // No need more attempts.
                       future0.setException(t);
                       LOGGER.error("Failed to send message(s) finally, run out 
of attempt times, maxAttempts={}, " +
                               "attempt={}, topic={}, messageId(s)={}, 
endpoints={}, clientId={}",
                           maxAttempts, attempt, topic, messageIds, endpoints, 
clientId, t);
                       return;
                   }
                   // No need more attempts for transactional message.
                   if (MessageType.TRANSACTION.equals(messageType)) {
                       future0.setException(t);
                       LOGGER.error("Failed to send transactional message 
finally, maxAttempts=1, attempt={}, " +
                               "topic={}, messageId(s)={}, endpoints={}, 
clientId={}", attempt, topic, messageIds,
                           endpoints, clientId, t);
                       return;
                   }
                   // Try to do more attempts.
                   int nextAttempt = 1 + attempt;
                   // Retry immediately if the request is not throttled.
                   if (!(t instanceof TooManyRequestsException)) {
                       LOGGER.warn("Failed to send message, would attempt to 
resend right now, maxAttempts={}, "
                               + "attempt={}, topic={}, messageId(s)={}, 
endpoints={}, clientId={}", maxAttempts, attempt,
                           topic, messageIds, endpoints, clientId, t);
                       send0(future0, topic, messageType, candidates, messages, 
nextAttempt);
                       return;
                   }
                   final Duration delay = 
ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
                   LOGGER.warn("Failed to send message due to too many 
requests, would attempt to resend after {}, "
                           + "maxAttempts={}, attempt={}, topic={}, 
messageId(s)={}, endpoints={}, clientId={}", delay,
                       maxAttempts, attempt, topic, messageIds, endpoints, 
clientId, t);
                   
ProducerImpl.this.getClientManager().getScheduler().schedule(() -> 
send0(future0, topic, messageType,
                       candidates, messages, nextAttempt), delay.toNanos(), 
TimeUnit.NANOSECONDS);
               }
           }, clientCallbackExecutor);
       }
   
   这里的before是不是存在调用时机错误? 感觉像是没注意。 在dobefore的调用时间在       final 
ListenableFuture<List<SendReceiptImpl>> future = send0(endpoints, messages, 
mq);之后?
     既然是发送消息之前拦截  应该是在其之前把?  对照了MessageInterceptor #doBefore的其他实现 
都是在ListenableFuture<List<SendReceiptImpl>> future = send0(endpoints, messages, 
mq);之前
   
   ### Steps to Reproduce
   
   null
   
   ### What Did You Expect to See?
   
   期望是正确的调用顺序
   
   ### What Did You See Instead?
   
   非正常调用顺序
   
   ### Additional Context
   
   null


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to