This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d750461f3c [ISSUE #9857] Optimize the performance of the notification
by removing unnecessary retry checks (#9858)
d750461f3c is described below
commit d750461f3c7b4c5b45f13baac31ff8f3809d51ae
Author: qianye <[email protected]>
AuthorDate: Tue Nov 18 20:58:36 2025 +0800
[ISSUE #9857] Optimize the performance of the notification by removing
unnecessary retry checks (#9858)
Change-Id: Iddb8841defd4a33e65080c2451df466d045e4a23
---
.../broker/processor/NotificationProcessor.java | 29 +++++++---------------
1 file changed, 9 insertions(+), 20 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 640d77c298..6028093443 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -135,9 +135,16 @@ public class NotificationProcessor implements
NettyRequestProcessor {
}
int randomQ = random.nextInt(100);
boolean hasMsg = false;
- boolean needRetry = randomQ % 5 == 0;
BrokerConfig brokerConfig = brokerController.getBrokerConfig();
- if (needRetry) {
+ if (requestHeader.getQueueId() < 0) {
+ // read all queue
+ hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader);
+ } else {
+ int queueId = requestHeader.getQueueId();
+ hasMsg = hasMsgFromQueue(topicConfig.getTopicName(),
requestHeader, queueId);
+ }
+ // if it doesn't have message, fetch retry
+ if (!hasMsg) {
String retryTopic =
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader);
if (!hasMsg && brokerConfig.isEnableRetryTopicV2() &&
brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
@@ -145,24 +152,6 @@ public class NotificationProcessor implements
NettyRequestProcessor {
hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ,
requestHeader);
}
}
- if (!hasMsg) {
- if (requestHeader.getQueueId() < 0) {
- // read all queue
- hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader);
- } else {
- int queueId = requestHeader.getQueueId();
- hasMsg = hasMsgFromQueue(topicConfig.getTopicName(),
requestHeader, queueId);
- }
- // if it doesn't have message, fetch retry again
- if (!needRetry && !hasMsg) {
- String retryTopic =
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
- hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader);
- if (!hasMsg && brokerConfig.isEnableRetryTopicV2() &&
brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
- String retryTopicConfigV1 =
KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(),
requestHeader.getConsumerGroup());
- hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ,
requestHeader);
- }
- }
- }
if (!hasMsg) {
PollingResult pollingResult = popLongPollingService.polling(ctx,
request, new PollingHeader(requestHeader));