This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d750461f3c [ISSUE #9857] Optimize the performance of the notification 
by removing unnecessary retry checks (#9858)
d750461f3c is described below

commit d750461f3c7b4c5b45f13baac31ff8f3809d51ae
Author: qianye <[email protected]>
AuthorDate: Tue Nov 18 20:58:36 2025 +0800

    [ISSUE #9857] Optimize the performance of the notification by removing 
unnecessary retry checks (#9858)
    
    Change-Id: Iddb8841defd4a33e65080c2451df466d045e4a23
---
 .../broker/processor/NotificationProcessor.java    | 29 +++++++---------------
 1 file changed, 9 insertions(+), 20 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 640d77c298..6028093443 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -135,9 +135,16 @@ public class NotificationProcessor implements 
NettyRequestProcessor {
         }
         int randomQ = random.nextInt(100);
         boolean hasMsg = false;
-        boolean needRetry = randomQ % 5 == 0;
         BrokerConfig brokerConfig = brokerController.getBrokerConfig();
-        if (needRetry) {
+        if (requestHeader.getQueueId() < 0) {
+            // read all queue
+            hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader);
+        } else {
+            int queueId = requestHeader.getQueueId();
+            hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), 
requestHeader, queueId);
+        }
+        // if it doesn't have message, fetch retry
+        if (!hasMsg) {
             String retryTopic = 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
             hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader);
             if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && 
brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
@@ -145,24 +152,6 @@ public class NotificationProcessor implements 
NettyRequestProcessor {
                 hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, 
requestHeader);
             }
         }
-        if (!hasMsg) {
-            if (requestHeader.getQueueId() < 0) {
-                // read all queue
-                hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader);
-            } else {
-                int queueId = requestHeader.getQueueId();
-                hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), 
requestHeader, queueId);
-            }
-            // if it doesn't have message, fetch retry again
-            if (!needRetry && !hasMsg) {
-                String retryTopic = 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
-                hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader);
-                if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && 
brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
-                    String retryTopicConfigV1 = 
KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), 
requestHeader.getConsumerGroup());
-                    hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, 
requestHeader);
-                }
-            }
-        }
 
         if (!hasMsg) {
             PollingResult pollingResult = popLongPollingService.polling(ctx, 
request, new PollingHeader(requestHeader));

Reply via email to