fingerLr opened a new issue, #1140: URL: https://github.com/apache/rocketmq-clients/issues/1140
### Before Creating the Bug Report - [x] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions). - [x] I have searched the [GitHub Issues](https://github.com/apache/rocketmq-clients/issues) and [GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions) of this repository and believe that this is not a duplicate. - [x] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Programming Language of the Client Java ### Runtime Platform Environment windows11 ### RocketMQ Version of the Client/Server 5.0.2 ### Run or Compiler Version JDK17 ### Describe the Bug ProducerImpl# private void send0(SettableFuture<List<SendReceiptImpl>> future0, String topic, MessageType messageType, final List<MessageQueueImpl> candidates, final List<PublishingMessageImpl> messages, final int attempt) { // Calculate the current message queue. final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1, candidates.size())); final List<MessageType> acceptMessageTypes = mq.getAcceptMessageTypes(); if (publishingSettings.isValidateMessageType() && !acceptMessageTypes.contains(messageType)) { final IllegalArgumentException e = new IllegalArgumentException("Current message type not match with " + "topic accept message types, topic=" + topic + ", actualMessageType=" + messageType + ", " + "acceptMessageTypes=" + acceptMessageTypes); future0.setException(e); return; } final Endpoints endpoints = mq.getBroker().getEndpoints(); final ListenableFuture<List<SendReceiptImpl>> future = send0(endpoints, messages, mq); final int maxAttempts = this.getRetryPolicy().getMaxAttempts(); // Intercept before message publishing. final List<GeneralMessage> generalMessages = messages.stream().map((Function<PublishingMessageImpl, GeneralMessage>) GeneralMessageImpl::new).collect(Collectors.toList()); final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.SEND); doBefore(context, generalMessages); Futures.addCallback(future, new FutureCallback<List<SendReceiptImpl>>() { @Override public void onSuccess(List<SendReceiptImpl> sendReceipts) { // Should never reach here. if (sendReceipts.size() != messages.size()) { final InternalErrorException e = new InternalErrorException("[Bug] due to an" + " unknown reason from remote, received send receipt's quantity " + sendReceipts.size() + " is not equal to sent message's quantity " + messages.size()); future0.setException(e); // Intercept after message publishing. final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR); doAfter(context0, generalMessages); return; } // Intercept after message publishing. final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK); doAfter(context0, generalMessages); // No need more attempts. future0.set(sendReceipts); // Resend message(s) successfully. if (1 < attempt) { // Collect messageId(s) for logging. List<MessageId> messageIds = new ArrayList<>(); for (SendReceipt receipt : sendReceipts) { messageIds.add(receipt.getMessageId()); } LOGGER.info("Resend message successfully, topic={}, messageId(s)={}, maxAttempts={}, " + "attempt={}, endpoints={}, clientId={}", topic, messageIds, maxAttempts, attempt, endpoints, clientId); } // Send message(s) successfully on first attempt, return directly. } @Override public void onFailure(Throwable t) { // Intercept after message publishing. final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR); doAfter(context0, generalMessages); // Collect messageId(s) for logging. List<MessageId> messageIds = new ArrayList<>(); for (PublishingMessageImpl message : messages) { messageIds.add(message.getMessageId()); } // Isolate endpoints because of sending failure. isolate(endpoints); if (attempt >= maxAttempts) { // No need more attempts. future0.setException(t); LOGGER.error("Failed to send message(s) finally, run out of attempt times, maxAttempts={}, " + "attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", maxAttempts, attempt, topic, messageIds, endpoints, clientId, t); return; } // No need more attempts for transactional message. if (MessageType.TRANSACTION.equals(messageType)) { future0.setException(t); LOGGER.error("Failed to send transactional message finally, maxAttempts=1, attempt={}, " + "topic={}, messageId(s)={}, endpoints={}, clientId={}", attempt, topic, messageIds, endpoints, clientId, t); return; } // Try to do more attempts. int nextAttempt = 1 + attempt; // Retry immediately if the request is not throttled. if (!(t instanceof TooManyRequestsException)) { LOGGER.warn("Failed to send message, would attempt to resend right now, maxAttempts={}, " + "attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", maxAttempts, attempt, topic, messageIds, endpoints, clientId, t); send0(future0, topic, messageType, candidates, messages, nextAttempt); return; } final Duration delay = ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt); LOGGER.warn("Failed to send message due to too many requests, would attempt to resend after {}, " + "maxAttempts={}, attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay, maxAttempts, attempt, topic, messageIds, endpoints, clientId, t); ProducerImpl.this.getClientManager().getScheduler().schedule(() -> send0(future0, topic, messageType, candidates, messages, nextAttempt), delay.toNanos(), TimeUnit.NANOSECONDS); } }, clientCallbackExecutor); } 这里的before是不是存在调用时机错误? 感觉像是没注意。 ProducerImpl# private void send0(SettableFuture<List<SendReceiptImpl>> future0, String topic, MessageType messageType, final List<MessageQueueImpl> candidates, final List<PublishingMessageImpl> messages, final int attempt) { // Calculate the current message queue. final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1, candidates.size())); final List<MessageType> acceptMessageTypes = mq.getAcceptMessageTypes(); if (publishingSettings.isValidateMessageType() && !acceptMessageTypes.contains(messageType)) { final IllegalArgumentException e = new IllegalArgumentException("Current message type not match with " + "topic accept message types, topic=" + topic + ", actualMessageType=" + messageType + ", " + "acceptMessageTypes=" + acceptMessageTypes); future0.setException(e); return; } final Endpoints endpoints = mq.getBroker().getEndpoints(); final ListenableFuture<List<SendReceiptImpl>> future = send0(endpoints, messages, mq); final int maxAttempts = this.getRetryPolicy().getMaxAttempts(); // Intercept before message publishing. final List<GeneralMessage> generalMessages = messages.stream().map((Function<PublishingMessageImpl, GeneralMessage>) GeneralMessageImpl::new).collect(Collectors.toList()); final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.SEND); doBefore(context, generalMessages); Futures.addCallback(future, new FutureCallback<List<SendReceiptImpl>>() { @Override public void onSuccess(List<SendReceiptImpl> sendReceipts) { // Should never reach here. if (sendReceipts.size() != messages.size()) { final InternalErrorException e = new InternalErrorException("[Bug] due to an" + " unknown reason from remote, received send receipt's quantity " + sendReceipts.size() + " is not equal to sent message's quantity " + messages.size()); future0.setException(e); // Intercept after message publishing. final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR); doAfter(context0, generalMessages); return; } // Intercept after message publishing. final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK); doAfter(context0, generalMessages); // No need more attempts. future0.set(sendReceipts); // Resend message(s) successfully. if (1 < attempt) { // Collect messageId(s) for logging. List<MessageId> messageIds = new ArrayList<>(); for (SendReceipt receipt : sendReceipts) { messageIds.add(receipt.getMessageId()); } LOGGER.info("Resend message successfully, topic={}, messageId(s)={}, maxAttempts={}, " + "attempt={}, endpoints={}, clientId={}", topic, messageIds, maxAttempts, attempt, endpoints, clientId); } // Send message(s) successfully on first attempt, return directly. } @Override public void onFailure(Throwable t) { // Intercept after message publishing. final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR); doAfter(context0, generalMessages); // Collect messageId(s) for logging. List<MessageId> messageIds = new ArrayList<>(); for (PublishingMessageImpl message : messages) { messageIds.add(message.getMessageId()); } // Isolate endpoints because of sending failure. isolate(endpoints); if (attempt >= maxAttempts) { // No need more attempts. future0.setException(t); LOGGER.error("Failed to send message(s) finally, run out of attempt times, maxAttempts={}, " + "attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", maxAttempts, attempt, topic, messageIds, endpoints, clientId, t); return; } // No need more attempts for transactional message. if (MessageType.TRANSACTION.equals(messageType)) { future0.setException(t); LOGGER.error("Failed to send transactional message finally, maxAttempts=1, attempt={}, " + "topic={}, messageId(s)={}, endpoints={}, clientId={}", attempt, topic, messageIds, endpoints, clientId, t); return; } // Try to do more attempts. int nextAttempt = 1 + attempt; // Retry immediately if the request is not throttled. if (!(t instanceof TooManyRequestsException)) { LOGGER.warn("Failed to send message, would attempt to resend right now, maxAttempts={}, " + "attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", maxAttempts, attempt, topic, messageIds, endpoints, clientId, t); send0(future0, topic, messageType, candidates, messages, nextAttempt); return; } final Duration delay = ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt); LOGGER.warn("Failed to send message due to too many requests, would attempt to resend after {}, " + "maxAttempts={}, attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay, maxAttempts, attempt, topic, messageIds, endpoints, clientId, t); ProducerImpl.this.getClientManager().getScheduler().schedule(() -> send0(future0, topic, messageType, candidates, messages, nextAttempt), delay.toNanos(), TimeUnit.NANOSECONDS); } }, clientCallbackExecutor); } 这里的before是不是存在调用时机错误? 感觉像是没注意。 在dobefore的调用时间在 final ListenableFuture<List<SendReceiptImpl>> future = send0(endpoints, messages, mq);之后? 既然是发送消息之前拦截 应该是在其之前把? 对照了MessageInterceptor #doBefore的其他实现 都是在ListenableFuture<List<SendReceiptImpl>> future = send0(endpoints, messages, mq);之前 ### Steps to Reproduce null ### What Did You Expect to See? 期望是正确的调用顺序 ### What Did You See Instead? 非正常调用顺序 ### Additional Context null -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
